[FLINK-7182] Activate checkstyle flink-java/functions

This closes #4333.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec1044d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec1044d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec1044d9

Branch: refs/heads/master
Commit: ec1044d985f24b9c1e45d1d25ef060a504e1695d
Parents: 137463c
Author: Dawid Wysakowicz <dwysakow...@apache.org>
Authored: Fri Jul 14 10:28:14 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Tue Jul 18 16:15:00 2017 +0200

----------------------------------------------------------------------
 .../flink/api/java/functions/FirstReducer.java  |  12 +-
 .../api/java/functions/FlatMapIterator.java     |  18 +-
 .../api/java/functions/FormattingMapper.java    |   4 +
 .../api/java/functions/FunctionAnnotation.java  | 108 ++++++-----
 .../api/java/functions/GroupReduceIterator.java |  17 +-
 .../flink/api/java/functions/IdPartitioner.java |   3 +
 .../api/java/functions/SampleInCoordinator.java |   3 +-
 .../api/java/functions/SampleInPartition.java   |   4 +-
 .../api/java/functions/SampleWithFraction.java  |   1 +
 .../api/java/functions/SelectByMaxFunction.java |  17 +-
 .../api/java/functions/SelectByMinFunction.java |  19 +-
 .../api/java/functions/SemanticPropUtil.java    | 188 +++++++++----------
 .../api/java/functions/ClosureCleanerTest.java  |   6 +-
 .../java/functions/SelectByFunctionsTest.java   |  69 +++----
 .../java/functions/SemanticPropUtilTest.java    |  60 +++---
 .../SemanticPropertiesPrecedenceTest.java       |  17 +-
 .../SemanticPropertiesProjectionTest.java       |  24 ++-
 .../SemanticPropertiesTranslationTest.java      | 118 ++++++------
 18 files changed, 363 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
index 2063a12..db2bf43 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FirstReducer.java
@@ -23,6 +23,10 @@ import 
org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.util.Collector;
 
+/**
+ * Reducer that only emits the first N elements in a group.
+ * @param <T>
+ */
 @Internal
 public class FirstReducer<T> implements GroupReduceFunction<T, T>, 
GroupCombineFunction<T, T> {
        private static final long serialVersionUID = 1L;
@@ -37,16 +41,16 @@ public class FirstReducer<T> implements 
GroupReduceFunction<T, T>, GroupCombineF
        public void reduce(Iterable<T> values, Collector<T> out) throws 
Exception {
 
                int emitCnt = 0;
-               for(T val : values) {
+               for (T val : values) {
                        out.collect(val);
-                       
+
                        emitCnt++;
-                       if(emitCnt == count) {
+                       if (emitCnt == count) {
                                break;
                        }
                }
        }
-       
+
        @Override
        public void combine(Iterable<T> values, Collector<T> out) throws 
Exception {
                reduce(values, out);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
index 8f47d8f..be4daef 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FlatMapIterator.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.util.Collector;
 
+import java.util.Iterator;
+
 /**
  * A convenience variant of the {@link 
org.apache.flink.api.common.functions.RichFlatMapFunction} that returns 
elements through an iterator, rather then
  * through a collector. In all other respects, it behaves exactly like the 
FlatMapFunction.
- * <p>
- * The function needs to be serializable, as defined in {@link 
java.io.Serializable}.
- * 
+ *
+ * <p>The function needs to be serializable, as defined in {@link 
java.io.Serializable}.
+ *
  * @param <IN> Type of the input elements.
  * @param <OUT> Type of the returned elements.
  */
@@ -41,17 +41,17 @@ public abstract class FlatMapIterator<IN, OUT> extends 
RichFlatMapFunction<IN, O
        /**
         * The core method of the function. Takes an element from the input 
data set and transforms
         * it into zero, one, or more elements.
-        * 
+        *
         * @param value The input value.
         * @return An iterator over the returned elements.
-        * 
+        *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        public abstract Iterator<OUT> flatMap(IN value) throws Exception;
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Delegates calls to the {@link #flatMap(Object)} method.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
index 95ae4a2..5d01018 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FormattingMapper.java
@@ -23,6 +23,10 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
 
+/**
+ * Mapper that converts values to strings using a {@link TextFormatter}.
+ * @param <T>
+ */
 @Internal
 public class FormattingMapper<T> implements MapFunction<T, String> {
        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index f01d9d8..93746fb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -47,11 +47,10 @@ import java.util.Set;
  * }
  * }</pre>
  *
- * <p>
- * All annotations take Strings with expressions that refer to (nested) value 
fields of the input and output types of a function.
+ *
+ * <p>All annotations take Strings with expressions that refer to (nested) 
value fields of the input and output types of a function.
  * Field expressions for of composite data types (tuples, POJOs, Scala case 
classes) can be expressed in
  * different ways, depending on the data type they refer to.
- *</p>
  *
  * <ul>
  *     <li>Java tuple data types (such as {@link 
org.apache.flink.api.java.tuple.Tuple3}): A tuple field can be addressed using
@@ -65,14 +64,13 @@ import java.util.Set;
  * of a case class that describes a 2d-coordinate.</li>
  * </ul>
  *
- * <p>
- * Nested fields are addressed by navigation, e.g., <code>"f1.xValue"</code> 
addresses the field <code>xValue</code> of a POJO type,
+ *
+ * <p>Nested fields are addressed by navigation, e.g., 
<code>"f1.xValue"</code> addresses the field <code>xValue</code> of a POJO type,
  * that is stored at the second field of a Java tuple. In order to refer to 
all fields of a composite type (or the composite type itself)
  * such as a tuple, POJO, or case class type, a <code>"*"</code> wildcard can 
be used, e.g., <code>f2.*</code> or <code>f2</code> reference all fields
  * of a composite type at the third position of a Java tuple.
- * </p>
  *
- * <b>NOTE: The use of semantic annotation is optional!
+ * <p><b>NOTE: The use of semantic annotation is optional!
  * If used correctly, semantic annotations can help the Flink optimizer to 
generate more efficient execution plans.
  * However, incorrect semantic annotations can cause the optimizer to generate 
incorrect execution plans which compute wrong results!
  * So be careful when adding semantic annotations.
@@ -86,27 +84,27 @@ public class FunctionAnnotation {
         * The ForwardedFields annotation declares fields which are never 
modified by the annotated function and
         * which are forwarded at the same position to the output or unchanged 
copied to another position in the output.
         *
-        * Fields that are forwarded at the same position can be specified by 
their position.
+        * <p>Fields that are forwarded at the same position can be specified 
by their position.
         * The specified position must be valid for the input and output data 
type and have the same type.
         * For example {@code {@literal @}ForwardedFields({"f2"})} declares 
that the third field of a Java input tuple is
         * copied to the third field of an output tuple.
         *
-        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * <p>Fields which are unchanged copied to another position in the 
output are declared by specifying the
         * source field expression in the input and the target field expression 
in the output.
         * {@code {@literal @}ForwardedFields({"f0->f2"})} denotes that the 
first field of the Java input tuple is
         * unchanged copied to the third field of the Java output tuple. When 
using the wildcard ("*") ensure that
         * the number of declared fields and their types in input and output 
type match.
         *
-        * Multiple forwarded fields can be annotated in one ({@code {@literal 
@}ForwardedFields({"f2; f3->f0; f4"})})
+        * <p>Multiple forwarded fields can be annotated in one ({@code 
{@literal @}ForwardedFields({"f2; f3->f0; f4"})})
         * or separate Strings ({@code {@literal @}ForwardedFields({"f2", 
"f3->f0", "f4"})}).
         *
-        * <b>NOTE: The use of the ForwardedFields annotation is optional.
+        * <p><b>NOTE: The use of the ForwardedFields annotation is optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
         * It is NOT required that all forwarded fields are declared, but all 
declarations must be correct.
         * </b>
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         */
@@ -121,30 +119,30 @@ public class FunctionAnnotation {
         * never modified by the annotated function and which are forwarded at 
the same position to the
         * output or unchanged copied to another position in the output.
         *
-        * Fields that are forwarded from the first input at the same position 
in the output can be
+        * <p>Fields that are forwarded from the first input at the same 
position in the output can be
         * specified by their position. The specified position must be valid 
for the input and output data type and have the same type.
         * For example {@code {@literal @}ForwardedFieldsFirst({"f2"})} 
declares that the third field of a Java input tuple at the first input is
         * copied to the third field of an output tuple.
         *
-        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * <p>Fields which are unchanged copied to another position in the 
output are declared by specifying the
         * source field expression in the input and the target field expression 
in the output.
         * {@code {@literal @}ForwardedFieldsFirst({"f0->f2"})} denotes that 
the first field of the Java input tuple at the first input is
         * unchanged copied to the third field of the Java output tuple. When 
using the wildcard ("*") ensure that
         * the number of declared fields and their types in input and output 
type match.
         *
-        * Multiple forwarded fields can be annotated in one ({@code {@literal 
@}ForwardedFieldsFirst({"f2; f3->f0; f4"})})
+        * <p>Multiple forwarded fields can be annotated in one ({@code 
{@literal @}ForwardedFieldsFirst({"f2; f3->f0; f4"})})
         * or separate Strings ({@code {@literal @}ForwardedFieldsFirst({"f2", 
"f3->f0", "f4"})}).
         *
-        * <b>NOTE: The use of the ForwardedFieldsFirst annotation is optional.
+        * <p><b>NOTE: The use of the ForwardedFieldsFirst annotation is 
optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
         * It is NOT required that all forwarded fields are declared, but all 
declarations must be correct.
         * </b>
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
-        * Forwarded fields from the second input can be specified using the
+        * <p>Forwarded fields from the second input can be specified using the
         * {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} 
annotation.
         *
         */
@@ -159,30 +157,30 @@ public class FunctionAnnotation {
         * never modified by the annotated function and which are forwarded at 
the same position to the
         * output or unchanged copied to another position in the output.
         *
-        * Fields that are forwarded from the second input at the same position 
in the output can be
+        * <p>Fields that are forwarded from the second input at the same 
position in the output can be
         * specified by their position. The specified position must be valid 
for the input and output data type and have the same type.
         * For example {@code {@literal @}ForwardedFieldsSecond({"f2"})} 
declares that the third field of a Java input tuple at the second input is
         * copied to the third field of an output tuple.
         *
-        * Fields which are unchanged copied to another position in the output 
are declared by specifying the
+        * <p>Fields which are unchanged copied to another position in the 
output are declared by specifying the
         * source field expression in the input and the target field expression 
in the output.
         * {@code {@literal @}ForwardedFieldsSecond({"f0->f2"})} denotes that 
the first field of the Java input tuple at the second input is
         * unchanged copied to the third field of the Java output tuple. When 
using the wildcard ("*") ensure that
         * the number of declared fields and their types in input and output 
type match.
         *
-        * Multiple forwarded fields can be annotated in one ({@code {@literal 
@}ForwardedFieldsSecond({"f2; f3->f0; f4"})})
+        * <p>Multiple forwarded fields can be annotated in one ({@code 
{@literal @}ForwardedFieldsSecond({"f2; f3->f0; f4"})})
         * or separate Strings ({@code {@literal @}ForwardedFieldsSecond({"f2", 
"f3->f0", "f4"})}).
         *
-        * <b>NOTE: The use of the ForwardedFieldsSecond annotation is optional.
+        * <p><b>NOTE: The use of the ForwardedFieldsSecond annotation is 
optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
         * It is NOT required that all forwarded fields are declared, but all 
declarations must be correct.
         * </b>
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
-        * Forwarded fields from the first input can be specified using the
+        * <p>Forwarded fields from the first input can be specified using the
         * {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} 
annotation.
         *
         */
@@ -197,20 +195,20 @@ public class FunctionAnnotation {
         * ALL other fields are considered to be unmodified at the same 
position.
         * Hence, the NonForwardedFields annotation is inverse to the {@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} 
annotation.
         *
-        * <b>NOTE: The use of the NonForwardedFields annotation is optional.
+        * <p><b>NOTE: The use of the NonForwardedFields annotation is optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
         * Since all not declared fields are considered to be forwarded, it is 
required that ALL non-forwarded fields are declared.
         * </b>
         *
-        * Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFields({"f1; f3"})</code>
+        * <p>Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFields({"f1; f3"})</code>
         * declares that the second and fourth field of a Java tuple are 
modified and all other fields are are not changed and remain
         * on their position. A NonForwardedFields annotation can only be used 
on functions where the type of the input and output are identical.
         *
-        * Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFields({"f1; f3"})</code>)
+        * <p>Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFields({"f1; f3"})</code>)
         * or separate Strings (<code>\@NonForwardedFields({"f1", 
"f3"})</code>).
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
@@ -227,21 +225,21 @@ public class FunctionAnnotation {
         * ALL other fields are considered to be unmodified at the same 
position.
         * Hence, the NonForwardedFieldsFirst annotation is inverse to the 
{@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} 
annotation.
         *
-        * <b>NOTE: The use of the NonForwardedFieldsFirst annotation is 
optional.
+        * <p><b>NOTE: The use of the NonForwardedFieldsFirst annotation is 
optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
         * Since all not declared fields are considered to be forwarded, it is 
required that ALL non-forwarded fields of the first input are declared.
         * </b>
         *
-        * Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFieldsFirst({"f1; f3"})</code>
+        * <p>Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFieldsFirst({"f1; f3"})</code>
         * declares that the second and fourth field of a Java tuple from the 
first input are modified and
         * all other fields of the first input are are not changed and remain 
on their position.
         * A NonForwardedFieldsFirst annotation can only be used on functions 
where the type of the first input and the output are identical.
         *
-        * Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFieldsFirst({"f1; f3"})</code>)
+        * <p>Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFieldsFirst({"f1; f3"})</code>)
         * or separate Strings (<code>\@NonForwardedFieldsFirst({"f1", 
"f3"})</code>).
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
@@ -259,21 +257,21 @@ public class FunctionAnnotation {
         * ALL other fields are considered to be unmodified at the same 
position.
         * Hence, the NonForwardedFieldsSecond annotation is inverse to the 
{@link 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} 
annotation.
         *
-        * <b>NOTE: The use of the NonForwardedFieldsSecond annotation is 
optional.
+        * <p><b>NOTE: The use of the NonForwardedFieldsSecond annotation is 
optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * However if used incorrectly, it can cause invalid plan choices and 
the computation of wrong results!
         * Since all not declared fields are considered to be forwarded, it is 
required that ALL non-forwarded fields of the second input are declared.
         * </b>
         *
-        * Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFieldsSecond({"f1; f3"})</code>
+        * <p>Non-forwarded fields are declared as a list of field expressions, 
e.g., <code>\@NonForwardedFieldsSecond({"f1; f3"})</code>
         * declares that the second and fourth field of a Java tuple from the 
second input are modified and
         * all other fields of the second input are are not changed and remain 
on their position.
         * A NonForwardedFieldsSecond annotation can only be used on functions 
where the type of the second input and the output are identical.
         *
-        * Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFieldsSecond({"f1; f3"})</code>)
+        * <p>Multiple non-forwarded fields can be annotated in one 
(<code>\@NonForwardedFieldsSecond({"f1; f3"})</code>)
         * or separate Strings (<code>\@NonForwardedFieldsSecond({"f1", 
"f3"})</code>).
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         * @see 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
@@ -291,20 +289,20 @@ public class FunctionAnnotation {
         * For example, fields which are evaluated in conditional statements or 
used for computations are considered to be read.
         * Fields which are only unmodified copied to the output without 
evaluating their values are NOT considered to be read.
         *
-        * <b>NOTE: The use of the ReadFields annotation is optional.
+        * <p><b>NOTE: The use of the ReadFields annotation is optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * The ReadFields annotation requires that ALL read fields are declared.
         * Otherwise, it can cause invalid plan choices and the computation of 
wrong results!
         * Declaring a non-read field as read is not harmful but might reduce 
optimization potential.
         * </b>
         *
-        * Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFields({"f0; f2"})</code> declares the first and third
+        * <p>Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFields({"f0; f2"})</code> declares the first and third
         * field of a Java input tuple to be read. All other fields are 
considered to not influence the behavior of the function.
         *
-        * Multiple read fields can be declared in one <code>\@ReadFields({"f0; 
f2"})</code> or
+        * <p>Multiple read fields can be declared in one 
<code>\@ReadFields({"f0; f2"})</code> or
         * multiple separate Strings <code>\@ReadFields({"f0", "f2"})</code>.
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         */
@@ -321,21 +319,21 @@ public class FunctionAnnotation {
         * For example, fields which are evaluated in conditional statements or 
used for computations are considered to be read.
         * Fields which are only unmodified copied to the output without 
evaluating their values are NOT considered to be read.
         *
-        * <b>NOTE: The use of the ReadFieldsFirst annotation is optional.
+        * <p><b>NOTE: The use of the ReadFieldsFirst annotation is optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * The ReadFieldsFirst annotation requires that ALL read fields of the 
first input are declared.
         * Otherwise, it can cause invalid plan choices and the computation of 
wrong results!
         * Declaring a non-read field as read is not harmful but might reduce 
optimization potential.
         * </b>
         *
-        * Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFieldsFirst({"f0; f2"})</code> declares the first and third
+        * <p>Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFieldsFirst({"f0; f2"})</code> declares the first and third
         * field of a Java input tuple of the first input to be read.
         * All other fields of the first input are considered to not influence 
the behavior of the function.
         *
-        * Multiple read fields can be declared in one 
<code>\@ReadFieldsFirst({"f0; f2"})</code> or
+        * <p>Multiple read fields can be declared in one 
<code>\@ReadFieldsFirst({"f0; f2"})</code> or
         * multiple separate Strings <code>\@ReadFieldsFirst({"f0", 
"f2"})</code>.
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         */
@@ -352,21 +350,21 @@ public class FunctionAnnotation {
         * For example, fields which are evaluated in conditional statements or 
used for computations are considered to be read.
         * Fields which are only unmodified copied to the output without 
evaluating their values are NOT considered to be read.
         *
-        * <b>NOTE: The use of the ReadFieldsSecond annotation is optional.
+        * <p><b>NOTE: The use of the ReadFieldsSecond annotation is optional.
         * If used correctly, it can help the Flink optimizer to generate more 
efficient execution plans.
         * The ReadFieldsSecond annotation requires that ALL read fields of the 
second input are declared.
         * Otherwise, it can cause invalid plan choices and the computation of 
wrong results!
         * Declaring a non-read field as read is not harmful but might reduce 
optimization potential.
         * </b>
         *
-        * Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFieldsSecond({"f0; f2"})</code> declares the first and third
+        * <p>Read fields are declared as a list of field expressions, e.g., 
<code>\@ReadFieldsSecond({"f0; f2"})</code> declares the first and third
         * field of a Java input tuple of the second input to be read.
         * All other fields of the second input are considered to not influence 
the behavior of the function.
         *
-        * Multiple read fields can be declared in one 
<code>\@ReadFieldsSecond({"f0; f2"})</code> or
+        * <p>Multiple read fields can be declared in one 
<code>\@ReadFieldsSecond({"f0; f2"})</code> or
         * multiple separate Strings <code>\@ReadFieldsSecond({"f0", 
"f2"})</code>.
         *
-        * Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
+        * <p>Please refer to the JavaDoc of {@link 
org.apache.flink.api.common.functions.Function} or Flink's documentation for
         * details on field expressions such as nested fields and wildcard.
         *
         */
@@ -381,7 +379,7 @@ public class FunctionAnnotation {
         * The SkipCodeAnalysis annotation declares that a function will not be 
analyzed by Flink's
         * code analysis capabilities independent of the configured {@link 
org.apache.flink.api.common.CodeAnalysisMode}.
         *
-        * If this annotation is not present the static code analyzer 
pre-interprets user-defined
+        * <p>If this annotation is not present the static code analyzer 
pre-interprets user-defined
         * functions in order to get implementation insights for program 
improvements that can be
         * printed to the log as hints, automatically applied, or disabled (see
         * {@link org.apache.flink.api.common.ExecutionConfig}).
@@ -404,7 +402,7 @@ public class FunctionAnnotation {
 
        /**
         * Reads the annotations of a user defined function with one input and 
returns semantic properties according to the forwarded fields annotated.
-        * 
+        *
         * @param udfClass The user defined function, represented by its class.
         * @return      The DualInputSemanticProperties containing the 
forwarded fields.
         */
@@ -415,17 +413,17 @@ public class FunctionAnnotation {
                ReadFields readSet = udfClass.getAnnotation(ReadFields.class);
 
                Set<Annotation> annotations = new HashSet<Annotation>();
-               if(forwardedFields != null) {
+               if (forwardedFields != null) {
                        annotations.add(forwardedFields);
                }
-               if(nonForwardedFields != null) {
-                       if(!annotations.isEmpty()) {
+               if (nonForwardedFields != null) {
+                       if (!annotations.isEmpty()) {
                                throw new InvalidProgramException("Either " + 
ForwardedFields.class.getSimpleName() + " or " +
                                                
NonForwardedFields.class.getSimpleName() + " can be annotated to a function, 
not both.");
                        }
                        annotations.add(nonForwardedFields);
                }
-               if(readSet != null) {
+               if (readSet != null) {
                        annotations.add(readSet);
                }
 
@@ -443,7 +441,7 @@ public class FunctionAnnotation {
 
                // get readSet annotation from stub
                ForwardedFieldsFirst forwardedFields1 = 
udfClass.getAnnotation(ForwardedFieldsFirst.class);
-               ForwardedFieldsSecond forwardedFields2= 
udfClass.getAnnotation(ForwardedFieldsSecond.class);
+               ForwardedFieldsSecond forwardedFields2 = 
udfClass.getAnnotation(ForwardedFieldsSecond.class);
 
                // get readSet annotation from stub
                NonForwardedFieldsFirst nonForwardedFields1 = 
udfClass.getAnnotation(NonForwardedFieldsFirst.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
index 722d99b..b381049 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/GroupReduceIterator.java
@@ -18,23 +18,26 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.util.Iterator;
-
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.util.Collector;
 
+import java.util.Iterator;
+
+/**
+ * Base class that simplifies reducing all values provided as {@link Iterable}.
+ * @param <IN>
+ * @param <OUT>
+ */
 @PublicEvolving
 public abstract class GroupReduceIterator<IN, OUT> extends 
RichGroupReduceFunction<IN, OUT> {
-       
-       private static final long serialVersionUID = 1L;
 
+       private static final long serialVersionUID = 1L;
 
        public abstract Iterator<OUT> reduceGroup(Iterable<IN> values) throws 
Exception;
-       
-       
+
        // 
-------------------------------------------------------------------------------------------
-       
+
        @Override
        public final void reduce(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
                for (Iterator<OUT> iter = reduceGroup(values); iter.hasNext(); 
) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
index 31caf86..4104eda 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/IdPartitioner.java
@@ -21,6 +21,9 @@ package org.apache.flink.api.java.functions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Partitioner;
 
+/**
+ * Partitioner that partitions by id.
+ */
 @Internal
 public class IdPartitioner implements Partitioner<Integer> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
index 61b28af..31bf211 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInCoordinator.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.java.sampling.IntermediateSampleData;
 import org.apache.flink.api.java.sampling.DistributedRandomSampler;
+import org.apache.flink.api.java.sampling.IntermediateSampleData;
 import org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement;
 import org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
index d524200..d38c0fe 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleInPartition.java
@@ -15,12 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.java.sampling.IntermediateSampleData;
 import org.apache.flink.api.java.sampling.DistributedRandomSampler;
+import org.apache.flink.api.java.sampling.IntermediateSampleData;
 import org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacement;
 import org.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement;
 import org.apache.flink.util.Collector;
@@ -70,4 +71,3 @@ public class SampleInPartition<T> extends 
RichMapPartitionFunction<T, Intermedia
        }
 }
 
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
index 04730f2..358cda7 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SampleWithFraction.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.annotation.Internal;

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
index 6e806db..e756c63 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.annotation.Internal;
@@ -22,13 +23,17 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
+/**
+ * Function that enables selection by maximal value of a field.
+ * @param <T>
+ */
 @Internal
 public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> 
{
        private static final long serialVersionUID = 1L;
-       
+
        // Fields which are used as KEYS
        private int[] fields;
-       
+
        /**
         * Constructor which is overwriting the default constructor.
         * @param type Types of tuple whether to check if given fields are key 
types.
@@ -38,7 +43,7 @@ public class SelectByMaxFunction<T extends Tuple> implements 
ReduceFunction<T> {
         */
        public SelectByMaxFunction(TupleTypeInfo<T> type, int... fields) {
                this.fields = fields;
-               
+
                // Check correctness of each position
                for (int field : fields) {
                        // Is field inside array
@@ -57,12 +62,12 @@ public class SelectByMaxFunction<T extends Tuple> 
implements ReduceFunction<T> {
        }
 
        /**
-        * Reduce implementation, returns bigger tuple or value1 if both tuples 
are 
+        * Reduce implementation, returns bigger tuple or value1 if both tuples 
are
         * equal. Comparison highly depends on the order and amount of fields 
chosen
         * as indices. All given fields (at construction time) are checked in 
the same
-        * order as defined (at construction time). If both tuples are equal in 
one 
+        * order as defined (at construction time). If both tuples are equal in 
one
         * index, the next index is compared. Or if no next index is available 
value1
-        * is returned. 
+        * is returned.
         * The tuple which has a bigger value at one index will be returned.
         */
        @SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
index fdd5f7f..1ea4117 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.annotation.Internal;
@@ -22,13 +23,17 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
+/**
+ * Function that enables selection by minimal value of a field.
+ * @param <T>
+ */
 @Internal
 public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> 
{
        private static final long serialVersionUID = 1L;
-       
+
        // Fields which are used as KEYS
        private int[] fields;
-       
+
        /**
         * Constructor which is overwriting the default constructor.
         * @param type Types of tuple whether to check if given fields are key 
types.
@@ -38,7 +43,7 @@ public class SelectByMinFunction<T extends Tuple> implements 
ReduceFunction<T> {
         */
        public SelectByMinFunction(TupleTypeInfo<T> type, int... fields) {
                this.fields = fields;
-               
+
                // Check correctness of each position
                for (int field : fields) {
                        // Is field inside array
@@ -55,14 +60,14 @@ public class SelectByMinFunction<T extends Tuple> 
implements ReduceFunction<T> {
 
                }
        }
-       
+
        /**
-        * Reduce implementation, returns smaller tuple or value1 if both 
tuples are 
+        * Reduce implementation, returns smaller tuple or value1 if both 
tuples are
         * equal. Comparison highly depends on the order and amount of fields 
chosen
         * as indices. All given fields (at construction time) are checked in 
the same
-        * order as defined (at construction time). If both tuples are equal in 
one 
+        * order as defined (at construction time). If both tuples are equal in 
one
         * index, the next index is compared. Or if no next index is available 
value1
-        * is returned. 
+        * is returned.
         * The tuple which has a smaller value at one index will be returned.
         */
        @SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index aedba15..e254d94 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -16,19 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
+import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import 
org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
-import 
org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
+import 
org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
@@ -38,7 +38,6 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.NonForwardedFields
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ReadFieldsSecond;
-import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 import java.lang.annotation.Annotation;
@@ -55,14 +54,14 @@ import java.util.regex.Pattern;
 @Internal
 public final class SemanticPropUtil {
 
-       private final static String REGEX_WILDCARD = "[\\"+ 
Keys.ExpressionKeys.SELECT_ALL_CHAR+"\\"+ 
Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA+"]";
-       private final static String REGEX_SINGLE_FIELD = 
"[\\p{L}\\p{Digit}_\\$]+";
-       private final static String REGEX_NESTED_FIELDS = "((" + 
REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\."+ REGEX_WILDCARD 
+")?";
+       private static final String REGEX_WILDCARD = "[\\" + 
Keys.ExpressionKeys.SELECT_ALL_CHAR + "\\" + 
Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA + "]";
+       private static final String REGEX_SINGLE_FIELD = 
"[\\p{L}\\p{Digit}_\\$]+";
+       private static final String REGEX_NESTED_FIELDS = "((" + 
REGEX_SINGLE_FIELD + "\\.)*" + REGEX_SINGLE_FIELD + ")(\\." + REGEX_WILDCARD + 
")?";
 
-       private final static String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + 
";)*(" + REGEX_NESTED_FIELDS + ");?)";
-       private final static String REGEX_FORWARD = "(("+ REGEX_NESTED_FIELDS 
+"|"+ REGEX_WILDCARD +")->(" + REGEX_NESTED_FIELDS + "|"+ REGEX_WILDCARD +"))";
-       private final static String REGEX_FIELD_OR_FORWARD = "(" + 
REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")";
-       private final static String REGEX_ANNOTATION = "((" + 
REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)";
+       private static final String REGEX_LIST = "((" + REGEX_NESTED_FIELDS + 
";)*(" + REGEX_NESTED_FIELDS + ");?)";
+       private static final String REGEX_FORWARD = "((" + REGEX_NESTED_FIELDS 
+ "|" + REGEX_WILDCARD + ")->(" + REGEX_NESTED_FIELDS + "|" + REGEX_WILDCARD + 
"))";
+       private static final String REGEX_FIELD_OR_FORWARD = "(" + 
REGEX_NESTED_FIELDS + "|" + REGEX_FORWARD + ")";
+       private static final String REGEX_ANNOTATION = "((" + 
REGEX_FIELD_OR_FORWARD + ";)*(" + REGEX_FIELD_OR_FORWARD + ");?)";
 
        private static final Pattern PATTERN_WILDCARD = 
Pattern.compile(REGEX_WILDCARD);
        private static final Pattern PATTERN_FORWARD = 
Pattern.compile(REGEX_FORWARD);
@@ -70,8 +69,7 @@ public final class SemanticPropUtil {
        private static final Pattern PATTERN_LIST = Pattern.compile(REGEX_LIST);
        private static final Pattern PATTERN_FIELD = 
Pattern.compile(REGEX_NESTED_FIELDS);
 
-       public static SingleInputSemanticProperties 
createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType)
-       {
+       public static SingleInputSemanticProperties 
createProjectionPropertiesSingle(int[] fields, CompositeType<?> inType) {
 
                Character.isJavaIdentifierStart(1);
 
@@ -79,17 +77,17 @@ public final class SemanticPropUtil {
 
                int[] sourceOffsets = new int[inType.getArity()];
                sourceOffsets[0] = 0;
-               for(int i=1; i<inType.getArity(); i++) {
-                       sourceOffsets[i] = 
inType.getTypeAt(i-1).getTotalFields() + sourceOffsets[i-1];
+               for (int i = 1; i < inType.getArity(); i++) {
+                       sourceOffsets[i] = inType.getTypeAt(i - 
1).getTotalFields() + sourceOffsets[i - 1];
                }
 
                int targetOffset = 0;
-               for(int i=0; i<fields.length; i++) {
+               for (int i = 0; i < fields.length; i++) {
                        int sourceOffset = sourceOffsets[fields[i]];
                        int numFieldsToCopy = 
inType.getTypeAt(fields[i]).getTotalFields();
 
-                       for(int j=0; j<numFieldsToCopy; j++) {
-                               ssp.addForwardedField(sourceOffset+j, 
targetOffset+j);
+                       for (int j = 0; j < numFieldsToCopy; j++) {
+                               ssp.addForwardedField(sourceOffset + j, 
targetOffset + j);
                        }
                        targetOffset += numFieldsToCopy;
                }
@@ -98,45 +96,44 @@ public final class SemanticPropUtil {
        }
 
        public static DualInputSemanticProperties 
createProjectionPropertiesDual(
-                       int[] fields, boolean[] isFromFirst, TypeInformation<?> 
inType1, TypeInformation<?> inType2)
-       {
+               int[] fields, boolean[] isFromFirst, TypeInformation<?> 
inType1, TypeInformation<?> inType2) {
                DualInputSemanticProperties dsp = new 
DualInputSemanticProperties();
 
                int[] sourceOffsets1;
-               if(inType1 instanceof TupleTypeInfo<?>) {
+               if (inType1 instanceof TupleTypeInfo<?>) {
                        sourceOffsets1 = new int[inType1.getArity()];
                        sourceOffsets1[0] = 0;
-                       for(int i=1; i<inType1.getArity(); i++) {
-                               sourceOffsets1[i] = 
((TupleTypeInfo<?>)inType1).getTypeAt(i-1).getTotalFields() + 
sourceOffsets1[i-1];
+                       for (int i = 1; i < inType1.getArity(); i++) {
+                               sourceOffsets1[i] = ((TupleTypeInfo<?>) 
inType1).getTypeAt(i - 1).getTotalFields() + sourceOffsets1[i - 1];
                        }
                } else {
-                       sourceOffsets1 = new int[] {0};
+                       sourceOffsets1 = new int[]{0};
                }
 
                int[] sourceOffsets2;
-               if(inType2 instanceof TupleTypeInfo<?>) {
+               if (inType2 instanceof TupleTypeInfo<?>) {
                        sourceOffsets2 = new int[inType2.getArity()];
                        sourceOffsets2[0] = 0;
-                       for(int i=1; i<inType2.getArity(); i++) {
-                               sourceOffsets2[i] = 
((TupleTypeInfo<?>)inType2).getTypeAt(i-1).getTotalFields() + 
sourceOffsets2[i-1];
+                       for (int i = 1; i < inType2.getArity(); i++) {
+                               sourceOffsets2[i] = ((TupleTypeInfo<?>) 
inType2).getTypeAt(i - 1).getTotalFields() + sourceOffsets2[i - 1];
                        }
                } else {
-                       sourceOffsets2 = new int[] {0};
+                       sourceOffsets2 = new int[]{0};
                }
 
                int targetOffset = 0;
-               for(int i=0; i<fields.length; i++) {
+               for (int i = 0; i < fields.length; i++) {
                        int sourceOffset;
                        int numFieldsToCopy;
                        int input;
-                       if(isFromFirst[i]) {
+                       if (isFromFirst[i]) {
                                input = 0;
                                if (fields[i] == -1) {
                                        sourceOffset = 0;
                                        numFieldsToCopy = 
inType1.getTotalFields();
                                } else {
                                        sourceOffset = 
sourceOffsets1[fields[i]];
-                                       numFieldsToCopy = 
((TupleTypeInfo<?>)inType1).getTypeAt(fields[i]).getTotalFields();
+                                       numFieldsToCopy = ((TupleTypeInfo<?>) 
inType1).getTypeAt(fields[i]).getTotalFields();
                                }
                        } else {
                                input = 1;
@@ -145,12 +142,12 @@ public final class SemanticPropUtil {
                                        numFieldsToCopy = 
inType2.getTotalFields();
                                } else {
                                        sourceOffset = 
sourceOffsets2[fields[i]];
-                                       numFieldsToCopy = 
((TupleTypeInfo<?>)inType2).getTypeAt(fields[i]).getTotalFields();
+                                       numFieldsToCopy = ((TupleTypeInfo<?>) 
inType2).getTypeAt(fields[i]).getTotalFields();
                                }
                        }
 
-                       for(int j=0; j<numFieldsToCopy; j++) {
-                               dsp.addForwardedField(input, sourceOffset+j, 
targetOffset+j);
+                       for (int j = 0; j < numFieldsToCopy; j++) {
+                               dsp.addForwardedField(input, sourceOffset + j, 
targetOffset + j);
                        }
                        targetOffset += numFieldsToCopy;
                }
@@ -203,33 +200,33 @@ public final class SemanticPropUtil {
                DualInputSemanticProperties offsetProps = new 
DualInputSemanticProperties();
 
                // add offset to read fields on first input
-               if(props.getReadFields(0) != null) {
+               if (props.getReadFields(0) != null) {
                        FieldSet offsetReadFields = new FieldSet();
-                       for(int r : props.getReadFields(0)) {
-                               offsetReadFields = 
offsetReadFields.addField(r+offset1);
+                       for (int r : props.getReadFields(0)) {
+                               offsetReadFields = offsetReadFields.addField(r 
+ offset1);
                        }
                        offsetProps.addReadFields(0, offsetReadFields);
                }
                // add offset to read fields on second input
-               if(props.getReadFields(1) != null) {
+               if (props.getReadFields(1) != null) {
                        FieldSet offsetReadFields = new FieldSet();
-                       for(int r : props.getReadFields(1)) {
-                               offsetReadFields = 
offsetReadFields.addField(r+offset2);
+                       for (int r : props.getReadFields(1)) {
+                               offsetReadFields = offsetReadFields.addField(r 
+ offset2);
                        }
                        offsetProps.addReadFields(1, offsetReadFields);
                }
 
                // add offset to forward fields on first input
-               for(int s = 0; s < numInputFields1; s++) {
+               for (int s = 0; s < numInputFields1; s++) {
                        FieldSet targetFields = 
props.getForwardingTargetFields(0, s);
-                       for(int t : targetFields) {
+                       for (int t : targetFields) {
                                offsetProps.addForwardedField(0, s + offset1, 
t);
                        }
                }
                // add offset to forward fields on second input
-               for(int s = 0; s < numInputFields2; s++) {
+               for (int s = 0; s < numInputFields2; s++) {
                        FieldSet targetFields = 
props.getForwardingTargetFields(1, s);
-                       for(int t : targetFields) {
+                       for (int t : targetFields) {
                                offsetProps.addForwardedField(1, s + offset2, 
t);
                        }
                }
@@ -261,11 +258,11 @@ public final class SemanticPropUtil {
                        } else if (ann instanceof ForwardedFieldsFirst || ann 
instanceof ForwardedFieldsSecond ||
                                        ann instanceof NonForwardedFieldsFirst 
|| ann instanceof NonForwardedFieldsSecond ||
                                        ann instanceof ReadFieldsFirst || ann 
instanceof ReadFieldsSecond) {
-                               throw new 
InvalidSemanticAnnotationException("Annotation "+ann.getClass()+" invalid for 
single input function.");
+                               throw new 
InvalidSemanticAnnotationException("Annotation " + ann.getClass() + " invalid 
for single input function.");
                        }
                }
 
-               if(forwarded != null || nonForwarded != null || read != null) {
+               if (forwarded != null || nonForwarded != null || read != null) {
                        SingleInputSemanticProperties result = new 
SingleInputSemanticProperties();
                        getSemanticPropsSingleFromString(result, forwarded, 
nonForwarded, read, inType, outType);
                        return result;
@@ -307,11 +304,11 @@ public final class SemanticPropUtil {
                        }
                }
 
-               if(forwardedFirst != null || nonForwardedFirst != null || 
readFirst != null ||
-                               forwardedSecond != null ||nonForwardedSecond != 
null || readSecond != null) {
+               if (forwardedFirst != null || nonForwardedFirst != null || 
readFirst != null ||
+                               forwardedSecond != null || nonForwardedSecond 
!= null || readSecond != null) {
                        DualInputSemanticProperties result = new 
DualInputSemanticProperties();
                        getSemanticPropsDualFromString(result, forwardedFirst, 
forwardedSecond,
-                                       nonForwardedFirst, nonForwardedSecond, 
readFirst, readSecond, inType1, inType2, outType);
+                               nonForwardedFirst, nonForwardedSecond, 
readFirst, readSecond, inType1, inType2, outType);
                        return result;
                }
                return null;
@@ -331,20 +328,20 @@ public final class SemanticPropUtil {
                boolean hasForwardedAnnotation = false;
                boolean hasNonForwardedAnnotation = false;
                // check for forwarded annotations
-               if(forwarded != null && forwarded.length > 0) {
+               if (forwarded != null && forwarded.length > 0) {
                        hasForwardedAnnotation = true;
                }
                // check non-forwarded annotations
-               if(nonForwarded != null && nonForwarded.length > 0) {
+               if (nonForwarded != null && nonForwarded.length > 0) {
                        hasNonForwardedAnnotation = true;
                }
 
-               if(hasForwardedAnnotation && hasNonForwardedAnnotation) {
+               if (hasForwardedAnnotation && hasNonForwardedAnnotation) {
                        throw new InvalidSemanticAnnotationException("Either 
ForwardedFields OR " +
                                        "NonForwardedFields annotation 
permitted, NOT both.");
-               } else if(hasForwardedAnnotation) {
+               } else if (hasForwardedAnnotation) {
                        parseForwardedFields(result, forwarded, inType, 
outType, 0, skipIncompatibleTypes);
-               } else if(hasNonForwardedAnnotation) {
+               } else if (hasNonForwardedAnnotation) {
                        parseNonForwardedFields(result, nonForwarded, inType, 
outType, 0, skipIncompatibleTypes);
                }
                parseReadFields(result, readSet, inType, 0);
@@ -356,8 +353,8 @@ public final class SemanticPropUtil {
                        readFieldsFirst, String[] readFieldsSecond,
                        TypeInformation<?> inType1, TypeInformation<?> inType2, 
TypeInformation<?> outType) {
                getSemanticPropsDualFromString(result, forwardedFirst, 
forwardedSecond, nonForwardedFirst,
-                               nonForwardedSecond, readFieldsFirst, 
readFieldsSecond, inType1, inType2,  outType,
-                               false);
+                       nonForwardedSecond, readFieldsFirst, readFieldsSecond, 
inType1, inType2, outType,
+                       false);
        }
 
        public static void 
getSemanticPropsDualFromString(DualInputSemanticProperties result,
@@ -372,38 +369,38 @@ public final class SemanticPropUtil {
                boolean hasNonForwardedFirstAnnotation = false;
                boolean hasNonForwardedSecondAnnotation = false;
                // check for forwarded annotations
-               if(forwardedFirst != null && forwardedFirst.length > 0) {
+               if (forwardedFirst != null && forwardedFirst.length > 0) {
                        hasForwardedFirstAnnotation = true;
                }
-               if(forwardedSecond != null && forwardedSecond.length > 0) {
+               if (forwardedSecond != null && forwardedSecond.length > 0) {
                        hasForwardedSecondAnnotation = true;
                }
                // check non-forwarded annotations
-               if(nonForwardedFirst != null && nonForwardedFirst.length > 0) {
+               if (nonForwardedFirst != null && nonForwardedFirst.length > 0) {
                        hasNonForwardedFirstAnnotation = true;
                }
-               if(nonForwardedSecond != null && nonForwardedSecond.length > 0) 
{
+               if (nonForwardedSecond != null && nonForwardedSecond.length > 
0) {
                        hasNonForwardedSecondAnnotation = true;
                }
 
-               if(hasForwardedFirstAnnotation && 
hasNonForwardedFirstAnnotation) {
+               if (hasForwardedFirstAnnotation && 
hasNonForwardedFirstAnnotation) {
                        throw new InvalidSemanticAnnotationException("Either 
ForwardedFieldsFirst OR " +
                                        "NonForwardedFieldsFirst annotation 
permitted, NOT both.");
                }
-               if(hasForwardedSecondAnnotation && 
hasNonForwardedSecondAnnotation) {
+               if (hasForwardedSecondAnnotation && 
hasNonForwardedSecondAnnotation) {
                        throw new InvalidSemanticAnnotationException("Either 
ForwardedFieldsSecond OR " +
                                        "NonForwardedFieldsSecond annotation 
permitted, NOT both.");
                }
 
-               if(hasForwardedFirstAnnotation) {
+               if (hasForwardedFirstAnnotation) {
                        parseForwardedFields(result, forwardedFirst, inType1, 
outType, 0, skipIncompatibleTypes);
-               } else if(hasNonForwardedFirstAnnotation) {
+               } else if (hasNonForwardedFirstAnnotation) {
                        parseNonForwardedFields(result, nonForwardedFirst, 
inType1, outType, 0, skipIncompatibleTypes);
                }
 
-               if(hasForwardedSecondAnnotation) {
+               if (hasForwardedSecondAnnotation) {
                        parseForwardedFields(result, forwardedSecond, inType2, 
outType, 1, skipIncompatibleTypes);
-               } else if(hasNonForwardedSecondAnnotation) {
+               } else if (hasNonForwardedSecondAnnotation) {
                        parseNonForwardedFields(result, nonForwardedSecond, 
inType2, outType, 1, skipIncompatibleTypes);
                }
 
@@ -411,7 +408,6 @@ public final class SemanticPropUtil {
                parseReadFields(result, readFieldsSecond, inType2, 1);
        }
 
-
        private static void parseForwardedFields(SemanticProperties sp, 
String[] forwardedStr,
                        TypeInformation<?> inType, TypeInformation<?> outType, 
int input, boolean skipIncompatibleTypes) {
 
@@ -434,8 +430,7 @@ public final class SemanticPropUtil {
                                if (!inType.equals(outType)) {
                                        if (skipIncompatibleTypes) {
                                                continue;
-                                       }
-                                       else {
+                                       } else {
                                                throw new 
InvalidSemanticAnnotationException("Forwarded field annotation \"" + s +
                                                                "\" with 
wildcard only allowed for identical input and output types.");
                                        }
@@ -468,8 +463,7 @@ public final class SemanticPropUtil {
                                        if (!areFieldsCompatible(sourceStr, 
inType, targetStr, outType, !skipIncompatibleTypes)) {
                                                if (skipIncompatibleTypes) {
                                                        continue;
-                                               }
-                                               else {
+                                               } else {
                                                        throw new 
InvalidSemanticAnnotationException("Referenced fields of forwarded field 
annotation \"" + s + "\" do not match.");
                                                }
                                        }
@@ -511,8 +505,7 @@ public final class SemanticPropUtil {
                                                if 
(!areFieldsCompatible(fieldStr, inType, fieldStr, outType, 
!skipIncompatibleTypes)) {
                                                        if 
(skipIncompatibleTypes) {
                                                                continue;
-                                                       }
-                                                       else {
+                                                       } else {
                                                                throw new 
InvalidSemanticAnnotationException("Referenced fields of forwarded field 
annotation \"" + s + "\" do not match.");
                                                        }
                                                }
@@ -541,12 +534,12 @@ public final class SemanticPropUtil {
        private static void parseNonForwardedFields(SemanticProperties sp, 
String[] nonForwardedStr,
                        TypeInformation<?> inType, TypeInformation<?> outType, 
int input, boolean skipIncompatibleTypes) {
 
-               if(nonForwardedStr == null) {
+               if (nonForwardedStr == null) {
                        return;
                }
 
                FieldSet excludedFields = new FieldSet();
-               for(String s : nonForwardedStr) {
+               for (String s : nonForwardedStr) {
 
                        // remove white characters
                        s = s.replaceAll("\\s", "");
@@ -555,18 +548,17 @@ public final class SemanticPropUtil {
                                continue;
                        }
 
-                       if(!inType.equals(outType)) {
+                       if (!inType.equals(outType)) {
                                if (skipIncompatibleTypes) {
                                        continue;
-                               }
-                               else {
+                               } else {
                                        throw new 
InvalidSemanticAnnotationException("Non-forwarded fields annotation only 
allowed for identical input and output types.");
                                }
                        }
 
                        Matcher matcher = PATTERN_LIST.matcher(s);
                        if (!matcher.matches()) {
-                               throw new 
InvalidSemanticAnnotationException("Invalid format of non-forwarded fields 
annotation \""+s+"\".");
+                               throw new 
InvalidSemanticAnnotationException("Invalid format of non-forwarded fields 
annotation \"" + s + "\".");
                        }
 
                        // process individual fields
@@ -580,17 +572,17 @@ public final class SemanticPropUtil {
                                        for (FlatFieldDescriptor ffd : inFFDs) {
                                                excludedFields = 
excludedFields.addField(ffd.getPosition());
                                        }
-                               } catch(InvalidFieldReferenceException ifre) {
-                                       throw new 
InvalidSemanticAnnotationException("Invalid field reference in non-forwarded 
fields annotation \""+fieldStr+"\".",ifre);
+                               } catch (InvalidFieldReferenceException ifre) {
+                                       throw new 
InvalidSemanticAnnotationException("Invalid field reference in non-forwarded 
fields annotation \"" + fieldStr + "\".", ifre);
                                }
                        }
                }
 
-               for(int i = 0; i < inType.getTotalFields(); i++) {
-                       if(!excludedFields.contains(i)) {
-                               if(sp instanceof SingleInputSemanticProperties) 
{
-                                       ((SingleInputSemanticProperties) 
sp).addForwardedField(i,i);
-                               } else if(sp instanceof 
DualInputSemanticProperties) {
+               for (int i = 0; i < inType.getTotalFields(); i++) {
+                       if (!excludedFields.contains(i)) {
+                               if (sp instanceof 
SingleInputSemanticProperties) {
+                                       ((SingleInputSemanticProperties) 
sp).addForwardedField(i, i);
+                               } else if (sp instanceof 
DualInputSemanticProperties) {
                                        ((DualInputSemanticProperties) 
sp).addForwardedField(input, i, i);
                                }
                        }
@@ -600,11 +592,11 @@ public final class SemanticPropUtil {
 
        private static void parseReadFields(SemanticProperties sp, String[] 
readFieldStrings, TypeInformation<?> inType, int input) {
 
-               if(readFieldStrings == null) {
+               if (readFieldStrings == null) {
                        return;
                }
 
-               for(String s : readFieldStrings) {
+               for (String s : readFieldStrings) {
 
                        FieldSet readFields = new FieldSet();
 
@@ -662,12 +654,10 @@ public final class SemanticPropUtil {
                        // get target type information
                        TypeInformation<?> targetType = 
getExpressionTypeInformation(targetField, outType);
                        return sourceType.equals(targetType);
-               }
-               catch (InvalidFieldReferenceException e) {
+               } catch (InvalidFieldReferenceException e) {
                        if (throwException) {
                                throw e;
-                       }
-                       else {
+                       } else {
                                return false;
                        }
                }
@@ -675,30 +665,30 @@ public final class SemanticPropUtil {
 
        private static TypeInformation<?> getExpressionTypeInformation(String 
fieldStr, TypeInformation<?> typeInfo) {
                Matcher wildcardMatcher = PATTERN_WILDCARD.matcher(fieldStr);
-               if(wildcardMatcher.matches()) {
+               if (wildcardMatcher.matches()) {
                        return typeInfo;
                } else {
                        Matcher expMatcher = PATTERN_FIELD.matcher(fieldStr);
-                       if(!expMatcher.matches()) {
-                               throw new 
InvalidFieldReferenceException("Invalid field expression \""+fieldStr+"\".");
+                       if (!expMatcher.matches()) {
+                               throw new 
InvalidFieldReferenceException("Invalid field expression \"" + fieldStr + 
"\".");
                        }
-                       if(typeInfo instanceof CompositeType<?>) {
+                       if (typeInfo instanceof CompositeType<?>) {
                                return ((CompositeType<?>) 
typeInfo).getTypeAt(expMatcher.group(1));
                        } else {
-                               throw new 
InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not 
possible on atomic type ("+typeInfo+").");
+                               throw new 
InvalidFieldReferenceException("Nested field expression \"" + fieldStr + "\" 
not possible on atomic type (" + typeInfo + ").");
                        }
                }
        }
 
        private static List<FlatFieldDescriptor> getFlatFields(String fieldStr, 
TypeInformation<?> typeInfo) {
-               if(typeInfo instanceof CompositeType<?>) {
+               if (typeInfo instanceof CompositeType<?>) {
                        return ((CompositeType<?>) 
typeInfo).getFlatFields(fieldStr);
                } else {
                        Matcher wildcardMatcher = 
PATTERN_WILDCARD.matcher(fieldStr);
-                       if(wildcardMatcher.matches()) {
+                       if (wildcardMatcher.matches()) {
                                return Collections.singletonList(new 
FlatFieldDescriptor(0, typeInfo));
                        } else {
-                               throw new 
InvalidFieldReferenceException("Nested field expression \""+fieldStr+"\" not 
possible on atomic type ("+typeInfo+").");
+                               throw new 
InvalidFieldReferenceException("Nested field expression \"" + fieldStr + "\" 
not possible on atomic type (" + typeInfo + ").");
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 496c61a..04f9b52 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -15,16 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ClosureCleaner;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.Serializable;
 
+/**
+ * Tests for {@link ClosureCleaner}.
+ */
 public class ClosureCleanerTest {
 
        @Test(expected = InvalidProgramException.class)
@@ -195,4 +200,3 @@ class NestedNonSerializableMapCreator implements MapCreator 
{
 
 }
 
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
index 52b59ec..3cfd74e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
@@ -15,36 +15,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Tests for {@link SelectByMaxFunction} and {@link SelectByMinFunction}.
+ */
 public class SelectByFunctionsTest {
-       
+
        private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, 
Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, 
Integer>>(
                        BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO,
                        BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO);
-       
+
        private final Tuple5<Integer, Long, String, Long, Integer> bigger = new 
Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 20);
        private final Tuple5<Integer, Long, String, Long, Integer> smaller = 
new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 15);
-       
+
        //Special case where only the last value determines if bigger or smaller
        private final Tuple5<Integer, Long, String, Long, Integer> 
specialCaseBigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, 
"HelloWorld", 200L, 17);
        private final Tuple5<Integer, Long, String, Long, Integer> 
specialCaseSmaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, 
"Hello", 50L, 17);
-       
-       
+
        /**
         * This test validates whether the order of tuples has any impact on 
the outcome and if the bigger tuple is returned.
         */
        @Test
        public void testMaxByComparison() {
                SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0});
-               
+
                try {
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(smaller, bigger));
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(bigger, smaller));
@@ -52,17 +56,17 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        // ----------------------- MAXIMUM FUNCTION TEST BELOW 
--------------------------
-       
+
        /**
         * This test cases checks when two tuples only differ in one value, but 
this value is not
         * in the fields list. In that case it should be seen as equal and then 
the first given tuple (value1) should be returned by reduce().
         */
        @Test
        public void testMaxByComparisonSpecialCase1() {
-               SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0,3});
-               
+               SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0, 3});
+
                try {
                        Assert.assertSame("SelectByMax must return the first 
given tuple", specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger));
                        Assert.assertSame("SelectByMax must return the first 
given tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger));
@@ -70,14 +74,14 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        /**
         * This test cases checks when two tuples only differ in one value.
         */
        @Test
        public void testMaxByComparisonSpecialCase2() {
-               SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3});
-               
+               SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0, 2, 1, 4, 3});
+
                try {
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(specialCaseBigger, bigger));
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger));
@@ -85,14 +89,14 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        /**
         * This test validates that equality is independent of the amount of 
used indices.
         */
        @Test
        public void testMaxByComparisonMultiple() {
-               SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4});
-               
+               SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0, 1, 2, 3, 4});
+
                try {
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(smaller, bigger));
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(bigger, smaller));
@@ -100,14 +104,14 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        /**
-        * Checks whether reduce does behave as expected if both values are the 
same object. 
+        * Checks whether reduce does behave as expected if both values are the 
same object.
         */
        @Test
        public void testMaxByComparisonMustReturnATuple() {
                SelectByMaxFunction<Tuple5<Integer, Long, String, Long, 
Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0});
-               
+
                try {
                        Assert.assertSame("SelectByMax must return bigger 
tuple", bigger, maxByTuple.reduce(bigger, bigger));
                        Assert.assertSame("SelectByMax must return smaller 
tuple", smaller, maxByTuple.reduce(smaller, smaller));
@@ -115,16 +119,16 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        // ----------------------- MINIMUM FUNCTION TEST BELOW 
--------------------------
-       
+
        /**
         * This test validates whether the order of tuples has any impact on 
the outcome and if the smaller tuple is returned.
         */
        @Test
        public void testMinByComparison() {
                SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0});
-               
+
                try {
                        Assert.assertSame("SelectByMin must return smaller 
tuple", smaller, minByTuple.reduce(smaller, bigger));
                        Assert.assertSame("SelectByMin must return smaller 
tuple", smaller, minByTuple.reduce(bigger, smaller));
@@ -132,15 +136,15 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        /**
         * This test cases checks when two tuples only differ in one value, but 
this value is not
         * in the fields list. In that case it should be seen as equal and then 
the first given tuple (value1) should be returned by reduce().
         */
        @Test
        public void testMinByComparisonSpecialCase1() {
-               SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0,3});
-               
+               SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0, 3});
+
                try {
                        Assert.assertSame("SelectByMin must return the first 
given tuple", specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger));
                        Assert.assertSame("SelectByMin must return the first 
given tuple", bigger, minByTuple.reduce(bigger, specialCaseBigger));
@@ -148,15 +152,15 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        /**
         * This test validates that when two tuples only differ in one value 
and that value's index is given
         * at construction time. The smaller tuple must be returned then.
         */
        @Test
        public void testMinByComparisonSpecialCase2() {
-               SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3});
-               
+               SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0, 2, 1, 4, 3});
+
                try {
                        Assert.assertSame("SelectByMin must return smaller 
tuple", smaller, minByTuple.reduce(specialCaseSmaller, smaller));
                        Assert.assertSame("SelectByMin must return smaller 
tuple", smaller, minByTuple.reduce(smaller, specialCaseSmaller));
@@ -164,14 +168,14 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
+
        /**
-        * Checks whether reduce does behave as expected if both values are the 
same object. 
+        * Checks whether reduce does behave as expected if both values are the 
same object.
         */
        @Test
        public void testMinByComparisonMultiple() {
-               SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4});
-               
+               SelectByMinFunction<Tuple5<Integer, Long, String, Long, 
Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, 
Long, Integer>>(tupleTypeInfo, new int[] {0, 1, 2, 3, 4});
+
                try {
                        Assert.assertSame("SelectByMin must return smaller 
tuple", smaller, minByTuple.reduce(smaller, bigger));
                        Assert.assertSame("SelectByMin must return smaller 
tuple", smaller, minByTuple.reduce(bigger, smaller));
@@ -179,6 +183,5 @@ public class SelectByFunctionsTest {
                        Assert.fail("No exception should be thrown while 
comparing both tuples");
                }
        }
-       
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec1044d9/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
index 453c022..8c4c32d 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.functions;
 
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
@@ -32,12 +31,16 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for semantic properties utils.
+ */
 public class SemanticPropUtilTest {
-       
+
        private final TypeInformation<?> threeIntTupleType = new 
TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO,
                        BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO);
 
@@ -74,13 +77,13 @@ public class SemanticPropUtilTest {
        @Test
        public void testSingleProjectionProperties() {
 
-               int[] pMap = new int[] {3,0,4};
+               int[] pMap = new int[] {3, 0, 4};
                SingleInputSemanticProperties sp = 
SemanticPropUtil.createProjectionPropertiesSingle(pMap, (CompositeType<?>) 
fiveIntTupleType);
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
                assertTrue(sp.getForwardingTargetFields(0, 3).contains(0));
                assertTrue(sp.getForwardingTargetFields(0, 4).contains(2));
 
-               pMap = new int[] {2,2,1,1};
+               pMap = new int[] {2, 2, 1, 1};
                sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, 
(CompositeType<?>) fiveIntTupleType);
                assertTrue(sp.getForwardingTargetFields(0, 1).size() == 2);
                assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
@@ -89,14 +92,14 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
                assertTrue(sp.getForwardingTargetFields(0, 2).contains(1));
 
-               pMap = new int[] {2,0};
+               pMap = new int[] {2, 0};
                sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, 
(CompositeType<?>) nestedTupleType);
                assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
                assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
                assertTrue(sp.getForwardingTargetFields(0, 2).contains(3));
 
-               pMap = new int[] {2,0,1};
+               pMap = new int[] {2, 0, 1};
                sp = SemanticPropUtil.createProjectionPropertiesSingle(pMap, 
(CompositeType<?>) deepNestedTupleType);
                assertTrue(sp.getForwardingTargetFields(0, 6).contains(0));
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(1));
@@ -119,7 +122,7 @@ public class SemanticPropUtilTest {
        @Test
        public void testDualProjectionProperties() {
 
-               int[] pMap = new int[]{4,2,0,1,3,4};
+               int[] pMap = new int[]{4, 2, 0, 1, 3, 4};
                boolean[] iMap = new boolean[]{true, true, false, true, false, 
false};
                DualInputSemanticProperties sp = 
SemanticPropUtil.createProjectionPropertiesDual(pMap, iMap,
                                fiveIntTupleType, fiveIntTupleType);
@@ -130,7 +133,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(1, 3).contains(4));
                assertTrue(sp.getForwardingTargetFields(1, 4).contains(5));
 
-               pMap = new int[]{4,2,0,4,0,1};
+               pMap = new int[]{4, 2, 0, 4, 0, 1};
                iMap = new boolean[]{true, true, false, true, false, false};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, fiveIntTupleType, fiveIntTupleType);
                assertTrue(sp.getForwardingTargetFields(0, 4).size() == 2);
@@ -142,7 +145,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(1, 0).contains(4));
                assertTrue(sp.getForwardingTargetFields(1, 1).contains(5));
 
-               pMap = new int[]{2,1,0,1};
+               pMap = new int[]{2, 1, 0, 1};
                iMap = new boolean[]{false, false, true, true};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, nestedTupleType, threeIntTupleType);
                assertTrue(sp.getForwardingTargetFields(1, 2).contains(0));
@@ -152,7 +155,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(0, 2).contains(4));
                assertTrue(sp.getForwardingTargetFields(0, 3).contains(5));
 
-               pMap = new int[]{1,0,0};
+               pMap = new int[]{1, 0, 0};
                iMap = new boolean[]{false, false, true};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, nestedTupleType, deepNestedTupleType);
                assertTrue(sp.getForwardingTargetFields(1, 1).contains(0));
@@ -165,7 +168,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(0, 1).contains(7));
                assertTrue(sp.getForwardingTargetFields(0, 2).contains(8));
 
-               pMap = new int[]{4,2,1,0};
+               pMap = new int[]{4, 2, 1, 0};
                iMap = new boolean[]{true, false, true, false};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, fiveIntTupleType, pojoInTupleType);
                assertTrue(sp.getForwardingTargetFields(0, 4).contains(0));
@@ -176,7 +179,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(0, 1).contains(5));
                assertTrue(sp.getForwardingTargetFields(1, 0).contains(6));
 
-               pMap = new int[]{2,3,-1,0};
+               pMap = new int[]{2, 3, -1, 0};
                iMap = new boolean[]{true, true, false, true};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, fiveIntTupleType, intType);
                assertTrue(sp.getForwardingTargetFields(0, 2).contains(0));
@@ -184,7 +187,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(1, 0).contains(2));
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(3));
 
-               pMap = new int[]{-1,-1};
+               pMap = new int[]{-1, -1};
                iMap = new boolean[]{false, true};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, intType, nestedPojoType);
                assertTrue(sp.getForwardingTargetFields(1, 0).contains(0));
@@ -195,7 +198,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(1, 5).contains(5));
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(6));
 
-               pMap = new int[]{-1,-1};
+               pMap = new int[]{-1, -1};
                iMap = new boolean[]{true, false};
                sp = SemanticPropUtil.createProjectionPropertiesDual(pMap, 
iMap, intType, nestedPojoType);
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
@@ -220,7 +223,7 @@ public class SemanticPropUtilTest {
                semProps.addForwardedField(0, 4);
                semProps.addForwardedField(2, 0);
                semProps.addForwardedField(4, 3);
-               semProps.addReadFields(new FieldSet(0,3));
+               semProps.addReadFields(new FieldSet(0, 3));
 
                SemanticProperties offsetProps = 
SemanticPropUtil.addSourceFieldOffset(semProps, 5, 0);
 
@@ -325,7 +328,7 @@ public class SemanticPropUtilTest {
 
        @Test
        public void testForwardedNoArrowIndividualStrings() {
-               String[] forwardedFields = {"f2","f3","f0"};
+               String[] forwardedFields = {"f2", "f3", "f0"};
                SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
                SemanticPropUtil.getSemanticPropsSingleFromString(sp, 
forwardedFields, null, null, fiveIntTupleType, fiveIntTupleType);
 
@@ -405,7 +408,7 @@ public class SemanticPropUtilTest {
                assertTrue(sp.getForwardingTargetFields(0, 0).contains(0));
                assertTrue(sp.getForwardingTargetFields(0, 1).contains(2));
        }
-       
+
        @Test
        public void testForwardedMixedOneString() {
                String[] forwardedFields = {"f2;f3;f0->f4;f4->f0"};
@@ -685,7 +688,7 @@ public class SemanticPropUtilTest {
 
        @Test(expected = InvalidSemanticAnnotationException.class)
        public void testForwardedInvalidTargetFieldType1() {
-               String[] forwardedFields = {"f0->f0","f1->f2"};
+               String[] forwardedFields = {"f0->f0", "f1->f2"};
                SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
                SemanticPropUtil.getSemanticPropsSingleFromString(sp, 
forwardedFields, null, null, fiveIntTupleType, threeMixedTupleType);
        }
@@ -766,7 +769,7 @@ public class SemanticPropUtilTest {
                SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
                SemanticPropUtil.getSemanticPropsSingleFromString(sp, 
forwardedFields, null, null, threeIntTupleType, threeIntTupleType);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Non-Forwarded Fields Annotation
        // 
--------------------------------------------------------------------------------------------
@@ -947,8 +950,7 @@ public class SemanticPropUtilTest {
                SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
                SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, 
nonForwardedFields, null, threeIntTupleType, threeIntTupleType);
        }
-       
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Read Fields Annotation
        // 
--------------------------------------------------------------------------------------------
@@ -964,7 +966,7 @@ public class SemanticPropUtilTest {
                assertTrue(fs.contains(2));
                assertTrue(fs.contains(1));
        }
-       
+
        @Test
        public void testReadFieldsOneString() {
                String[] readFields = { "f1;f2" };
@@ -1144,11 +1146,11 @@ public class SemanticPropUtilTest {
                SingleInputSemanticProperties sp = new 
SingleInputSemanticProperties();
                SemanticPropUtil.getSemanticPropsSingleFromString(sp, null, 
null, readFields, threeIntTupleType, threeIntTupleType);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Two Inputs
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Test
        public void testForwardedDual() {
                String[] forwardedFieldsFirst = { "f1->f2; f2->f3" };
@@ -1216,7 +1218,6 @@ public class SemanticPropUtilTest {
                assertTrue(dsp.getForwardingTargetFields(1, 3).contains(5));
        }
 
-
        @Test
        public void testNonForwardedDual() {
                String[] nonForwardedFieldsFirst = { "f1;f2" };
@@ -1438,6 +1439,9 @@ public class SemanticPropUtilTest {
        // Pojo Type Classes
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Sample test pojo.
+        */
        public static class TestPojo {
 
                public int int1;
@@ -1446,6 +1450,9 @@ public class SemanticPropUtilTest {
                public String string1;
        }
 
+       /**
+        * Sample test pojo.
+        */
        public static class TestPojo2 {
 
                public int myInt1;
@@ -1454,6 +1461,9 @@ public class SemanticPropUtilTest {
                public String myString1;
        }
 
+       /**
+        * Sample test pojo with nested type.
+        */
        public static class NestedTestPojo{
 
                public int int1;

Reply via email to