[FLINK-2906] Remove Record API

This closes #1403


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c787a037
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c787a037
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c787a037

Branch: refs/heads/master
Commit: c787a037de5bb456844f9704389bff65b4972e18
Parents: 8fddbf0
Author: zentol <ches...@apache.org>
Authored: Tue Nov 24 19:19:42 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Nov 25 23:45:24 2015 +0100

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |   4 +-
 docs/internals/fig/projects_dependencies.svg    |   6 -
 .../common/functions/GenericCollectorMap.java   |  33 --
 .../base/CollectorMapOperatorBase.java          |  60 ---
 .../java/org/apache/flink/types/Record.java     |   2 +-
 .../java/record/functions/CoGroupFunction.java  |  57 ---
 .../java/record/functions/CrossFunction.java    |  51 --
 .../record/functions/FunctionAnnotation.java    | 482 -------------------
 .../api/java/record/functions/JoinFunction.java |  44 --
 .../api/java/record/functions/MapFunction.java  |  53 --
 .../record/functions/MapPartitionFunction.java  |  54 ---
 .../java/record/functions/ReduceFunction.java   |  83 ----
 .../java/record/io/CollectionInputFormat.java   |  97 ----
 .../api/java/record/io/CsvInputFormat.java      | 414 ----------------
 .../api/java/record/io/CsvOutputFormat.java     | 479 ------------------
 .../java/record/io/DelimitedInputFormat.java    |  49 --
 .../java/record/io/DelimitedOutputFormat.java   | 319 ------------
 .../ExternalProcessFixedLengthInputFormat.java  | 227 ---------
 .../record/io/ExternalProcessInputFormat.java   | 143 ------
 .../record/io/ExternalProcessInputSplit.java    |  58 ---
 .../api/java/record/io/FileInputFormat.java     |  31 --
 .../api/java/record/io/FileOutputFormat.java    |  32 --
 .../java/record/io/FixedLengthInputFormat.java  | 245 ----------
 .../api/java/record/io/GenericInputFormat.java  |  29 --
 .../api/java/record/io/TextInputFormat.java     | 142 ------
 .../java/record/operators/BulkIteration.java    |  49 --
 .../java/record/operators/CoGroupOperator.java  | 399 ---------------
 .../record/operators/CollectionDataSource.java  | 225 ---------
 .../java/record/operators/CrossOperator.java    | 241 ----------
 .../operators/CrossWithLargeOperator.java       |  98 ----
 .../operators/CrossWithSmallOperator.java       |  98 ----
 .../java/record/operators/DeltaIteration.java   |  99 ----
 .../api/java/record/operators/FileDataSink.java | 236 ---------
 .../java/record/operators/FileDataSource.java   |  78 ---
 .../java/record/operators/GenericDataSink.java  | 250 ----------
 .../record/operators/GenericDataSource.java     |  77 ---
 .../api/java/record/operators/JoinOperator.java | 326 -------------
 .../api/java/record/operators/MapOperator.java  | 201 --------
 .../record/operators/MapPartitionOperator.java  | 200 --------
 .../record/operators/OperatorInfoHelper.java    |  49 --
 .../java/record/operators/ReduceOperator.java   | 407 ----------------
 .../record/CoGroupWrappingFunctionTest.java     | 206 --------
 .../java/record/ReduceWrappingFunctionTest.java | 233 ---------
 .../api/java/record/io/CsvInputFormatTest.java  | 406 ----------------
 .../api/java/record/io/CsvOutputFormatTest.java | 465 ------------------
 ...ternalProcessFixedLengthInputFormatTest.java | 298 ------------
 .../io/ExternalProcessInputFormatTest.java      | 283 -----------
 .../record/io/FixedLenghtInputFormatTest.java   | 212 --------
 .../api/java/record/io/TextInputFormatTest.java | 158 ------
 .../flink/optimizer/costs/CostEstimator.java    |   3 +-
 .../flink/optimizer/dag/CollectorMapNode.java   |  62 ---
 .../operators/CollectorMapDescriptor.java       |  75 ---
 .../plandump/PlanJSONDumpGenerator.java         |   1 -
 .../optimizer/plantranslate/JsonMapper.java     |   1 -
 .../traversals/GraphCreatingVisitor.java        |   4 -
 .../runtime/operators/CollectorMapDriver.java   | 118 -----
 .../flink/runtime/operators/DriverStrategy.java |   3 -
 .../chaining/ChainedCollectorMapDriver.java     |  87 ----
 .../flink/test/util/RecordAPITestBase.java      | 146 ------
 59 files changed, 3 insertions(+), 8985 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index fe3edaa..71a5e26 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -3079,9 +3079,7 @@ attribute. Both the command line and the web interface 
support a parameter to pa
 class name manually for cases where the JAR manifest contains neither 
attribute.
 
 2. If the entry point class implements the 
`org.apache.flinkapi.common.Program`, then the system
-calls the `getPlan(String...)` method to obtain the program plan to execute. 
The
-`getPlan(String...)` method was the only possible way of defining a program in 
the *Record API*
-(see [0.4 docs](http://stratosphere.eu/docs/0.4/)) and is also supported in 
the new Java API.
+calls the `getPlan(String...)` method to obtain the program plan to execute.
 
 3. If the entry point class does not implement the 
`org.apache.flinkapi.common.Program` interface,
 the system will invoke the main method of the class.

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/docs/internals/fig/projects_dependencies.svg
----------------------------------------------------------------------
diff --git a/docs/internals/fig/projects_dependencies.svg 
b/docs/internals/fig/projects_dependencies.svg
index d537ab8..76f6276 100644
--- a/docs/internals/fig/projects_dependencies.svg
+++ b/docs/internals/fig/projects_dependencies.svg
@@ -176,12 +176,6 @@ under the License.
        y="145.73346"
        
style="font-size:15.003737px;fill:#000000;font-style:italic;font-weight:normal;text-align:start;text-anchor:start;font-family:Verdana;"
        id="text3029">Java API</text>
-    <text
-       xml:space="preserve"
-       x="127.97233"
-       y="163.73794"
-       
style="font-size:15.003737px;fill:#000000;font-style:italic;font-weight:normal;text-align:start;text-anchor:start;font-family:Verdana;"
-       id="text3031">Old Record API</text>
     <path
        style="fill:#ffffff;fill-rule:evenodd;fill-opacity:1;stroke:none;"
        d="  M 260.48364 198.25564   C 260.48364 195.0861 263.05303 192.51671 
266.20382 192.51671   L 399.26822 192.51671   C 402.419 192.51671 404.98839 
195.0861 404.98839 198.25564   L 404.98839 221.13634   C 404.98839 224.30588 
402.419 226.87527 399.26822 226.87527   L 266.20382 226.87527   C 263.05303 
226.87527 260.48364 224.30588 260.48364 221.13634   z"

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
deleted file mode 100644
index d335862..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Variant of the flat map that is used for backwards compatibility in the 
deprecated Record-API-
- *
- * @param <T> The input data type.
- * @param <O> The result data type.
- */
-@Deprecated
-public interface GenericCollectorMap<T, O> extends RichFunction {
-       
-       void map(T record, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
deleted file mode 100644
index b7ff2ce..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.operators.base;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-
-/**
- * The CollectorMap is the old version of the Map operator. It is effectively 
a "flatMap", where the
- * UDF is called "map".
- * 
- * @see GenericCollectorMap
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class CollectorMapOperatorBase<IN, OUT, FT extends 
GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
-       
-       public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, 
UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-               super(udf, operatorInfo, name);
-       }
-       
-       public CollectorMapOperatorBase(FT udf, UnaryOperatorInformation<IN, 
OUT> operatorInfo, String name) {
-               super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
-       }
-       
-       public CollectorMapOperatorBase(Class<? extends FT> udf, 
UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-               super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       protected List<OUT> executeOnCollections(List<IN> inputData, 
RuntimeContext ctx, ExecutionConfig executionConfig) {
-               throw new UnsupportedOperationException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java 
b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 8ef972f..7cbbc44 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
 
 
 /**
- * The Record represents a multi-valued data record and forms the base of the 
"Record API"
+ * The Record represents a multi-valued data record.
  * The record is a tuple of arbitrary values. It implements a sparse tuple 
model, meaning that the record can contain
  * many fields which are actually null and not represented in the record. It 
has internally a bitmap marking which fields
  * are set and which are not.

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
deleted file mode 100644
index 1ddf362..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * The CoGroupFunction is the base class for functions that are invoked by a 
{@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
- */
-
-@Deprecated
-public abstract class CoGroupFunction extends AbstractRichFunction {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * This method must be implemented to provide a user implementation of a
-        * matcher. It is called for each two key-value pairs that share the 
same
-        * key and come from different inputs.
-        * 
-        * @param records1 The records from the first input which were paired 
with the key.
-        * @param records2 The records from the second input which were paired 
with the key.
-        * @param out A collector that collects all output pairs.
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
-        *                   decide whether to retry the task execution.
-        */
-       public abstract void coGroup(Iterator<Record> records1, 
Iterator<Record> records2, Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
deleted file mode 100644
index eaf34a0..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * The CrossFunction is the base class for functions that are invoked by a 
{@link org.apache.flink.api.java.record.operators.CrossOperator}.
- */
-@Deprecated
-public abstract class CrossFunction extends AbstractRichFunction implements 
org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * This method must be implemented to provide a user implementation of 
a cross.
-        * It is called for each element of the Cartesian product of both input 
sets.
-
-        * @param first The record from the second input.
-        * @param second The record from the second input.
-        * @return The result of the cross UDF
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the task 
and lets the fail-over logic
-        *                   decide whether to retry the task execution.
-        */
-       @Override
-       public abstract Record cross(Record first, Record second) throws 
Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
deleted file mode 100644
index 71d2a62..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.functions;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * 
- * This class defines the semantic assertions that can be added to functions.
- * The assertions are realized as java annotations, to be added to the class 
declaration of
- * the class that realized the user function. For example, to declare the 
<i>ConstantFieldsExcept</i> 
- * annotation for a map-type function that realizes a simple absolute function,
- * use it the following way:
- * 
- * <pre>{@code
- * {@literal @}ConstantFieldsExcept(fields={2})
- * public class MyMapper extends MapFunction
- * {
- *     public void map(Record record, Collector out)
- *     {
- *        int value = record.getField(2, IntValue.class).getValue();
-               record.setField(2, new IntValue(Math.abs(value)));
-               
-               out.collect(record);
- *     }
- * }
- * }</pre>
- * 
- * Be aware that some annotations should only be used for functions with as 
single input 
- * ({@link MapFunction}, {@link ReduceFunction}) and some only for stubs with 
two inputs 
- * ({@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}).
- */
-@Deprecated
-public class FunctionAnnotation {
-       
-       /**
-        * Specifies the fields of an input record that are unchanged in the 
output of 
-        * a stub with a single input ( {@link MapFunction}, {@link 
ReduceFunction}).
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        * 
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsExcept} annotation.
-        * 
-        * If this annotation and the {@link ConstantFieldsExcept} annotation 
is not set, it is 
-        * assumed that <i>no</i> field is constant.
-        *
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFields {
-               int[] value();
-       }
-       
-       /**
-        * Specifies that all fields of an input record that are unchanged in 
the output of 
-        * a {@link MapFunction}, or {@link ReduceFunction}).
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        * 
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsExcept} annotation.
-        * 
-        * If this annotation and the {@link ConstantFieldsExcept} annotation 
is not set, it is 
-        * assumed that <i>no</i> field is constant.
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface AllFieldsConstants {}
-       
-       /**
-        * Specifies the fields of an input record of the first input that are 
unchanged in 
-        * the output of a stub with two inputs ( {@link CrossFunction}, {@link 
JoinFunction}, {@link CoGroupFunction})
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        *
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsFirstExcept} annotation.
-        * 
-        * If this annotation and the {@link ConstantFieldsFirstExcept} 
annotation is not set, it is 
-        * assumed that <i>no</i> field is constant.
-        * 
-        *
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsFirst {
-               int[] value();
-       }
-       
-       /**
-        * Specifies the fields of an input record of the second input that are 
unchanged in 
-        * the output of a stub with two inputs ( {@link CrossFunction}, {@link 
JoinFunction}, {@link CoGroupFunction})
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        *
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsSecondExcept} annotation.
-        * 
-        * If this annotation and the {@link ConstantFieldsSecondExcept} 
annotation is not set, it is 
-        * assumed that <i>no</i> field is constant.
-        * 
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsSecond {
-               int[] value();
-       }
-       
-       /**
-        * Specifies the fields of an input record that are changed in the 
output of 
-        * a stub with a single input ( {@link MapFunction}, {@link 
ReduceFunction}). All other 
-        * fields are assumed to be constant.
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        * 
-        * This annotation is mutually exclusive with the {@link 
ConstantFields} annotation.
-        * 
-        * If this annotation and the {@link ConstantFields} annotation is not 
set, it is 
-        * assumed that <i>no</i> field is constant.
-        *
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsExcept {
-               int[] value();
-       }
-       
-       /**
-        * Specifies the fields of an input record of the first input that are 
changed in 
-        * the output of a stub with two inputs ( {@link CrossFunction}, {@link 
JoinFunction}, {@link CoGroupFunction})
-        * All other fields are assumed to be constant.
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        *
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsFirst} annotation.
-        * 
-        * If this annotation and the {@link ConstantFieldsFirst} annotation is 
not set, it is 
-        * assumed that <i>no</i> field is constant.
-        * 
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsFirstExcept {
-               int[] value();
-       }
-       
-       
-       /**
-        * Specifies the fields of an input record of the second input that are 
changed in 
-        * the output of a stub with two inputs ( {@link CrossFunction}, {@link 
JoinFunction}, {@link CoGroupFunction})
-        * All other fields are assumed to be constant.
-        * 
-        * A field is considered to be constant if its value is not changed and 
copied to the same position of 
-        * output record.
-        * 
-        * <b>
-        * It is very important to follow a conservative strategy when 
specifying constant fields.
-        * Only fields that are always constant (regardless of value, stub 
call, etc.) to the output may be 
-        * inserted! Otherwise, the correct execution of a program can not be 
guaranteed.
-        * So if in doubt, do not add a field to this set.
-        * </b>
-        *
-        * This annotation is mutually exclusive with the {@link 
ConstantFieldsSecond} annotation.
-        * 
-        * If this annotation and the {@link ConstantFieldsSecond} annotation 
is not set, it is 
-        * assumed that <i>no</i> field is constant.
-        * 
-        */
-       @Target(ElementType.TYPE)
-       @Retention(RetentionPolicy.RUNTIME)
-       public @interface ConstantFieldsSecondExcept {
-               int[] value();
-       }
-
-       /**
-        * Private constructor to prevent instantiation. This class is intended 
only as a container.
-        */
-       private FunctionAnnotation() {}
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                   Function Annotation Handling
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static SingleInputSemanticProperties 
readSingleConstantAnnotations(UserCodeWrapper<?> udf) {
-               
-               // get constantSet annotation from stub
-               AllFieldsConstants allConstants = 
udf.getUserCodeAnnotation(AllFieldsConstants.class);
-               ConstantFields constantSet = 
udf.getUserCodeAnnotation(ConstantFields.class);
-               ConstantFieldsExcept notConstantSet = 
udf.getUserCodeAnnotation(ConstantFieldsExcept.class);
-
-               if (notConstantSet != null && (constantSet != null || 
allConstants != null)) {
-                       throw new RuntimeException("Either ConstantFields or 
ConstantFieldsExcept can be specified, not both.");
-               }
-               
-               // extract notConstantSet from annotation
-               if (notConstantSet != null) {
-                       FieldSet nonConstant = new 
FieldSet(notConstantSet.value());
-                       return new 
ImplicitlyForwardingSingleInputSemanticProperties(nonConstant);
-               }
-               
-               // extract notConstantSet from annotation
-               if (allConstants != null) {
-                       return new 
SingleInputSemanticProperties.AllFieldsForwardedProperties();
-               }
-               
-               SingleInputSemanticProperties semanticProperties = new 
SingleInputSemanticProperties();
-               
-               // extract constantSet from annotation
-               if (constantSet != null) {
-                       for (int value: constantSet.value()) {
-                               
semanticProperties.addForwardedField(value,value);
-                       }
-               }
-               
-               return semanticProperties;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static DualInputSemanticProperties 
readDualConstantAnnotations(UserCodeWrapper<?> udf) {
-               ImplicitlyForwardingTwoInputSemanticProperties 
semanticProperties = new ImplicitlyForwardingTwoInputSemanticProperties();
-
-               // get readSet annotation from stub
-               ConstantFieldsFirst constantSet1Annotation = 
udf.getUserCodeAnnotation(ConstantFieldsFirst.class);
-               ConstantFieldsSecond constantSet2Annotation = 
udf.getUserCodeAnnotation(ConstantFieldsSecond.class);
-               
-               // get readSet annotation from stub
-               ConstantFieldsFirstExcept notConstantSet1Annotation = 
udf.getUserCodeAnnotation(ConstantFieldsFirstExcept.class);
-               ConstantFieldsSecondExcept notConstantSet2Annotation = 
udf.getUserCodeAnnotation(ConstantFieldsSecondExcept.class);
-               
-               
-               if (notConstantSet1Annotation != null && constantSet1Annotation 
!= null) {
-                       throw new RuntimeException("Either ConstantFieldsFirst 
or ConstantFieldsFirstExcept can be specified, not both.");
-               }
-               
-               if (constantSet2Annotation != null && notConstantSet2Annotation 
!= null) {
-                       throw new RuntimeException("Either ConstantFieldsSecond 
or ConstantFieldsSecondExcept can be specified, not both.");
-               }
-               
-               
-               // extract readSets from annotations
-               if(notConstantSet1Annotation != null) {
-                       
semanticProperties.setImplicitlyForwardingFirstExcept(new 
FieldSet(notConstantSet1Annotation.value()));
-               }
-               
-               if(notConstantSet2Annotation != null) {
-                       
semanticProperties.setImplicitlyForwardingSecondExcept(new 
FieldSet(notConstantSet2Annotation.value()));
-               }
-               
-               // extract readSets from annotations
-               if (constantSet1Annotation != null) {
-                       for(int value: constantSet1Annotation.value()) {
-                               semanticProperties.addForwardedField(0, value, 
value);
-                       }
-               }
-               
-               if (constantSet2Annotation != null) {
-                       for(int value: constantSet2Annotation.value()) {
-                               semanticProperties.addForwardedField(1, value, 
value);
-                       }
-               }
-               
-               return semanticProperties;
-       }
-
-
-       private static final class 
ImplicitlyForwardingSingleInputSemanticProperties extends 
SingleInputSemanticProperties {
-
-               private static final long serialVersionUID = 1l;
-
-               private FieldSet nonForwardedFields;
-
-               private 
ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) {
-                       this.nonForwardedFields = nonForwardedFields;
-               }
-
-               @Override
-               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
-
-                       if (input != 0) {
-                               throw new IndexOutOfBoundsException();
-                       }
-
-                       if (nonForwardedFields == null) {
-                               return super.getForwardingTargetFields(input, 
sourceField);
-                       } else {
-                               if 
(this.nonForwardedFields.contains(sourceField)) {
-                                       return FieldSet.EMPTY_SET;
-                               } else {
-                                       return new FieldSet(sourceField);
-                               }
-                       }
-               }
-
-               @Override
-               public int getForwardingSourceField(int input, int targetField) 
{
-
-                       if (input != 0) {
-                               throw new IndexOutOfBoundsException();
-                       }
-
-                       if (nonForwardedFields == null) {
-                               return super.getForwardingSourceField(input, 
targetField);
-                       } else {
-                               if 
(this.nonForwardedFields.contains(targetField)) {
-                                       return -1;
-                               } else {
-                                       return targetField;
-                               }
-                       }
-               }
-
-               @Override
-               public FieldSet getReadFields(int input) {
-                       return null;
-               }
-
-               @Override
-               public void addForwardedField(int sourceField, int 
destinationField) {
-                       if (this.nonForwardedFields == null) {
-                               super.addForwardedField(sourceField, 
destinationField);
-                       } else {
-                               throw new UnsupportedOperationException("When 
defining fields as implicitly constant for an input" +
-                                               "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
-                       }
-               }
-
-       }
-
-       private static final class 
ImplicitlyForwardingTwoInputSemanticProperties extends 
DualInputSemanticProperties {
-               private static final long serialVersionUID = 1L;
-
-               private FieldSet nonForwardedFields1;
-               private FieldSet nonForwardedFields2;
-
-               private ImplicitlyForwardingTwoInputSemanticProperties() {}
-
-               public void setImplicitlyForwardingFirstExcept(FieldSet 
nonForwardedFields) {
-                       this.nonForwardedFields1 = nonForwardedFields;
-               }
-
-               public void setImplicitlyForwardingSecondExcept(FieldSet 
nonForwardedFields) {
-                       this.nonForwardedFields2 = nonForwardedFields;
-               }
-
-               @Override
-               public FieldSet getForwardingTargetFields(int input, int 
sourceField) {
-
-                       if(input != 0 && input != 1) {
-                               throw new IndexOutOfBoundsException();
-                       } else if (input == 0) {
-
-                               if (this.nonForwardedFields1 == null) {
-                                       return 
super.getForwardingTargetFields(0, sourceField);
-                               }
-                               else {
-                                       if 
(this.nonForwardedFields1.contains(sourceField)) {
-                                               return FieldSet.EMPTY_SET;
-                                       } else {
-                                               return new 
FieldSet(sourceField);
-                                       }
-                               }
-                       } else {
-
-                               if (this.nonForwardedFields2 == null) {
-                                       return 
super.getForwardingTargetFields(1, sourceField);
-                               }
-                               else {
-                                       if 
(this.nonForwardedFields2.contains(sourceField)) {
-                                               return FieldSet.EMPTY_SET;
-                                       } else {
-                                               return new 
FieldSet(sourceField);
-                                       }
-                               }
-                       }
-               }
-
-               @Override
-               public void addForwardedField(int input, int sourceField, int 
destinationField) {
-                       if (input != 0 && input != 1) {
-                               throw new IndexOutOfBoundsException();
-                       } else if (input == 0) {
-                               if (this.nonForwardedFields1 == null) {
-                                       super.addForwardedField(0, sourceField, 
destinationField);
-                               } else {
-                                       throw new 
UnsupportedOperationException("When defining fields as implicitly constant for 
an input" +
-                                                       "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
-                               }
-                       } else {
-                               if (this.nonForwardedFields2 == null) {
-                                       super.addForwardedField(1, sourceField, 
destinationField);
-                               } else {
-                                       throw new 
UnsupportedOperationException("When defining fields as implicitly constant for 
an input" +
-                                                       "(such as through the 
ConstantFieldsFirstExcept annotation), you cannot manually add forwarded 
fields.");
-                               }
-                       }
-               }
-
-               @Override
-               public FieldSet getReadFields(int input) {
-                       return null;
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
deleted file mode 100644
index 3afb271..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * The JoinFunction must implementation by functions of a {@link 
org.apache.flink.api.java.record.operators.JoinOperator}.
- * It resembles an equality join of both inputs on their key fields.
- */
-
-@Deprecated
-public abstract class JoinFunction extends AbstractRichFunction implements 
FlatJoinFunction<Record, Record, Record> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       @Override
-       public abstract void join(Record value1, Record value2, 
Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
deleted file mode 100644
index e51c35f..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * The MapFunction must be extended to provide a mapper implementation
- * By definition, the mapper is called for each individual input record.
- */
-@Deprecated
-public abstract class MapFunction extends AbstractRichFunction implements 
GenericCollectorMap<Record, Record> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * This method must be implemented to provide a user implementation of 
a mapper.
-        * It is called for each individual record.
-        * 
-        * @param record The record to be mapped.
-        * @param out A collector that collects all output records.
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the map 
task and lets the fail-over logic
-        *                   decide whether to retry the mapper execution.
-        */
-       @Override
-       public abstract void map(Record record, Collector<Record> out) throws 
Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
deleted file mode 100644
index ac18c95..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * The MapPartitionFunction must be extended to provide a map partition 
implementation
- * By definition, the map partition is called for a full input set.
- */
-
-@Deprecated
-public abstract class MapPartitionFunction extends AbstractRichFunction 
implements org.apache.flink.api.common.functions.MapPartitionFunction<Record, 
Record> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * This method must be implemented to provide a user implementation of 
a MapPartitionFunction.
-        * It is called for a full input set.
-        *
-        * @param values all input records
-        * @param out A collector that collects all output records.
-        *
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the map 
task and lets the fail-over logic
-        *                   decide whether to retry the mapper execution.
-        */
-       @Override
-       public abstract void mapPartition(Iterable<Record> values, 
Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
deleted file mode 100644
index 96350aa..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * The ReduceFunction must be extended to provide a reducer implementation, as 
invoked by a
- * {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
- */
-@Deprecated
-public abstract class ReduceFunction extends AbstractRichFunction {
-       
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * The central function to be implemented for a reducer. The function 
receives per call one
-        * key and all the values that belong to that key. Each key is 
guaranteed to be processed by exactly
-        * one function call across all involved instances across all computing 
nodes.
-        * 
-        * @param records All records that belong to the given input key.
-        * @param out The collector to hand results to.
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the reduce 
task and lets the fail-over logic
-        *                   decide whether to retry the reduce execution.
-        */
-       public abstract void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception;
-
-       /**
-        * No default implementation provided.
-        * This method must be overridden by reduce stubs that want to make use 
of the combining feature.
-        * In addition, the ReduceFunction extending class must be annotated as 
Combinable.
-        * Note that this function must be implemented, if the reducer is 
annotated as combinable.
-        * <p>
-        * The use of the combiner is typically a pre-reduction of the data. It 
works similar as the reducer, only that is
-        * is not guaranteed to see all values with the same key in one call to 
the combine function. Since it is called
-        * prior to the <code>reduce()</code> method, input and output types of 
the combine method are the input types of
-        * the <code>reduce()</code> method.
-        * 
-        * @see 
org.apache.flink.api.java.record.operators.ReduceOperator.Combinable
-        * @param records
-        *        The records to be combined. Unlike in the reduce method, 
these are not necessarily all records
-        *        belonging to the given key.
-        * @param out The collector to write the result to.
-        * 
-        * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime. When the
-        *                   runtime catches an exception, it aborts the 
combine task and lets the fail-over logic
-        *                   decide whether to retry the combiner execution.
-        */
-       public void combine(Iterator<Record> records, Collector<Record> out) 
throws Exception {
-               // to be implemented, if the reducer should use a combiner. 
Note that the combining method
-               // is only used, if the stub class is further annotated with 
the annotation
-               // @ReduceOperator.Combinable
-               reduce(records, out);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
deleted file mode 100644
index c94c727..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ValueUtil;
-
-/**
- * input format for java collection input. It can accept collection data or 
serializable iterator
- */
-public class CollectionInputFormat extends GenericInputFormat<Record> 
implements NonParallelInput {
-
-       private static final long serialVersionUID = 1L;
-
-       private Collection<?> dataSet; // input data as collection
-
-       private Iterator<?> serializableIter; // input data as serializable 
iterator
-
-       private transient Iterator<?> it;
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return !it.hasNext();
-       }
-
-       @Override
-       public void open(GenericInputSplit split) throws IOException {
-               super.open(split);
-               if (serializableIter != null) {
-                       it = serializableIter;
-               }
-               else {
-                       it = this.dataSet.iterator();
-               }
-       }
-
-       @Override
-       public Record nextRecord(Record record) throws IOException {
-               if (it.hasNext()) {
-                       record.clear();
-                       Object b = it.next();
-                       // check whether the record field is one-dimensional or 
multi-dimensional
-                       if (b.getClass().isArray()) {
-                               for (Object s : (Object[]) b) {
-                                       
record.addField(ValueUtil.toFlinkValueType(s));
-                               }
-                       }
-                       else if (b instanceof Collection) {
-                               @SuppressWarnings("unchecked")
-                               Iterator<Object> tmpIter = 
((Collection<Object>) b).iterator();
-                               while (tmpIter.hasNext()) {
-                                       Object s = tmpIter.next();
-                                       
record.addField(ValueUtil.toFlinkValueType(s));
-                               }
-                       }
-                       else {
-                               record.setField(0, 
ValueUtil.toFlinkValueType(b));
-                       }
-                       return record;
-               } else {
-                       return null;
-               }
-       }
-
-       public void setData(Collection<?> data) {
-               this.dataSet = data;
-               this.serializableIter = null;
-       }
-
-       public <T extends Iterator<?>, Serializable> void setIter(T iter) {
-               this.serializableIter = iter;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
deleted file mode 100644
index 4e92874..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.operators.CompilerHints;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.types.parser.FieldParser;
-
-import java.io.IOException;
-
-/**
- * Input format to parse text files and generate Records. 
- * The input file is structured by record delimiters and field delimiters (CSV 
files are common).
- * Record delimiter separate records from each other ('\n' is common).
- * Field delimiters separate fields within a record. 
- * Record and field delimiters must be configured using the InputFormat {@link 
Configuration}.
- * 
- * The number of fields to parse must be configured as well.  
- * For each field a data type must be specified using the {@link 
CsvInputFormat#FIELD_TYPE_PARAMETER_PREFIX} config key.
- * 
- * The position within the text record can be configured for each field using 
the {@link CsvInputFormat#TEXT_POSITION_PARAMETER_PREFIX} config key.
- * Either all text positions must be configured or none. If none is 
configured, the index of the config key is used.
- * The position of a value within the {@link Record} is the index of the 
config key.
- * 
- * @see Configuration
- * @see Record
- */
-@SuppressWarnings("deprecation")
-public class CsvInputFormat extends GenericCsvInputFormat<Record> {
-       
-       private static final long serialVersionUID = 1L;
-       
-       private transient Value[] parsedValues;
-       
-       private int[] targetPositions = new int[0];
-
-       private boolean configured = false;
-       
-       //To speed up readRecord processing. Used to find windows line endings.
-       //It is set when open so that readRecord does not have to evaluate it
-       private boolean lineDelimiterIsLinebreak = false;
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Constructors and getters/setters for the configurable parameters
-       // 
--------------------------------------------------------------------------------------------
-       
-       public CsvInputFormat() {
-               super();
-       }
-       
-       public CsvInputFormat(char fieldDelimiter) {
-               super();
-               setFieldDelimiter(fieldDelimiter);
-       }
-       
-       public CsvInputFormat(Class<? extends Value> ... fields) {
-               super();
-               setFieldTypes(fields);
-       }
-       
-       public CsvInputFormat(char fieldDelimiter, Class<? extends Value> ... 
fields) {
-               super();
-               setFieldDelimiter(fieldDelimiter);
-               setFieldTypes(fields);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public void setFieldTypesArray(Class<? extends Value>[] fieldTypes) {
-               setFieldTypes(fieldTypes);
-       }
-
-       public void setFieldTypes(Class<? extends Value> ... fieldTypes) {
-               if (fieldTypes == null) {
-                       throw new IllegalArgumentException("Field types must 
not be null.");
-               }
-               
-               // sanity check
-               for (Class<? extends Value> type : fieldTypes) {
-                       if (type != null && 
!Value.class.isAssignableFrom(type)) {
-                               throw new IllegalArgumentException("The types 
must be subclasses if " + Value.class.getName());
-                       }
-               }
-               
-               setFieldTypesGeneric(fieldTypes);
-       }
-
-       public void setFields(int[] sourceFieldIndices, Class<? extends 
Value>[] fieldTypes) {
-               Preconditions.checkNotNull(fieldTypes);
-               
-               // sanity check
-               for (Class<? extends Value> type : fieldTypes) {
-                       if (!Value.class.isAssignableFrom(type)) {
-                               throw new IllegalArgumentException("The types 
must be subclasses if " + Value.class.getName());
-                       }
-               }
-               
-               setFieldsGeneric(sourceFieldIndices, fieldTypes);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Pre-flight: Configuration
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void configure(Configuration config) {
-               super.configure(config);
-
-               if (configured) {
-                       return;
-               }
-               
-               final String fieldDelimStr = 
config.getString(FIELD_DELIMITER_PARAMETER, null);
-               if (fieldDelimStr != null) {
-                       setFieldDelimiter(fieldDelimStr);
-               }
-               
-               // read number of field configured via configuration
-               int numConfigFields = config.getInteger(NUM_FIELDS_PARAMETER, 
-1);
-               if (numConfigFields != -1) {
-                       if (numConfigFields <= 0) {
-                               throw new IllegalConfigurationException("The 
number of fields for the CsvInputFormat is invalid.");
-                       }
-                       
-                       if (getNumberOfNonNullFields() > 0) {
-                               throw new IllegalConfigurationException("Mixing 
configuration via instance parameters and config parameters is not possible.");
-                       }
-               
-                       int[] textPosIdx = new int[numConfigFields];
-                       boolean anyTextPosSet = false;
-                       boolean allTextPosSet = true;
-                       int maxTextPos = -1;
-                       
-                       // parse text positions
-                       for (int i = 0; i < numConfigFields; i++) {
-                               int pos = 
config.getInteger(TEXT_POSITION_PARAMETER_PREFIX + i, -1);
-                               if (pos == -1) {
-                                       allTextPosSet = false;
-                                       textPosIdx[i] = i;
-                                       maxTextPos = i;
-                               } else {
-                                       anyTextPosSet = true;
-                                       textPosIdx[i] = pos;
-                                       maxTextPos = pos > maxTextPos ? pos : 
maxTextPos;
-                               }
-                       }
-                       // check if either none or all text positions have been 
set
-                       if (anyTextPosSet && !allTextPosSet) {
-                               throw new IllegalArgumentException("Invalid 
configuration for CsvInputFormat: " +
-                                               "Not all text positions set");
-                       }
-                       
-                       // init the array of types to be set. unify the types 
from the config 
-                       // with the types array set on the instance
-                       
-                       // make sure we have a sufficiently large types array
-                       @SuppressWarnings("unchecked")
-                       Class<? extends Value>[] types = (Class<? extends 
Value>[]) new Class[maxTextPos+1];
-                       int[] targetPos = new int[maxTextPos+1];
-                       
-                       ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-                       
-                       // set the fields
-                       try {
-                               for (int i = 0; i < numConfigFields; i++) {
-                                       int pos = textPosIdx[i];
-                                       
-                                       Class<? extends Value> clazz = 
config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, 
cl).asSubclass(Value.class);
-                                       if (clazz == null) {
-                                               throw new 
IllegalConfigurationException("Invalid configuration for CsvInputFormat: " +
-                                                       "No field parser class 
for parameter " + i);
-                                       }
-                                       
-                                       types[pos] = clazz;
-                                       targetPos[pos] = i;
-                               }
-                       }
-                       catch (ClassNotFoundException e) {
-                               throw new RuntimeException("Could not resolve 
type classes", e);
-                       }
-                       
-                       // update the field types
-                       setFieldTypes(types);
-                       
-                       // make a dense target pos array
-                       this.targetPositions = new int[numConfigFields];
-                       for (int i = 0, k = 0; i < targetPos.length; i++) {
-                               if (types[i] != null) {
-                                       this.targetPositions[k++] = 
targetPos[i];
-                               }
-                       }
-               }
-               else {
-                       // not configured via config parameters
-                       if (this.targetPositions.length == 0) {
-                               this.targetPositions = new 
int[getNumberOfNonNullFields()];
-                               for (int i = 0; i < 
this.targetPositions.length; i++) {
-                                       this.targetPositions[i] = i;
-                               }
-                       }
-               }
-               
-               if (getNumberOfNonNullFields() == 0) {
-                       throw new IllegalConfigurationException("No fields 
configured in the CsvInputFormat.");
-               }
-
-               this.configured = true;
-       }
-       
-       
-       @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
-               
-               @SuppressWarnings("unchecked")
-               FieldParser<Value>[] fieldParsers = (FieldParser<Value>[]) 
getFieldParsers();
-               
-               // create the value holders
-               this.parsedValues = new Value[fieldParsers.length];
-               for (int i = 0; i < fieldParsers.length; i++) {
-                       this.parsedValues[i] = fieldParsers[i].createValue();
-               }
-               
-               //left to right evaluation makes access [0] okay
-               //this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
-               if(this.getDelimiter().length == 1 && this.getDelimiter()[0] == 
'\n' ) {
-                                       this.lineDelimiterIsLinebreak = true;
-               }
-       }
-       
-       @Override
-       public Record readRecord(Record reuse, byte[] bytes, int offset, int 
numBytes) throws ParseException {
-               /*
-                * Fix to support windows line endings in CSVInputFiles with 
standard delimiter setup = \n
-                */
-               //Find windows end line, so find carriage return before the 
newline
-               if(this.lineDelimiterIsLinebreak && bytes[offset + numBytes -1] 
== '\r') {
-                       //reduce the number of bytes so that the Carriage 
return is not taken as data
-                       numBytes--;
-               }
-               
-               if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-                       // valid parse, map values into pact record
-                       for (int i = 0; i < parsedValues.length; i++) {
-                               reuse.setField(targetPositions[i], 
parsedValues[i]);
-                       }
-                       return reuse;
-               } else {
-                       return null;
-               }
-       }
-       
-       // 
============================================================================================
-       //  Parameterization via configuration
-       // 
============================================================================================
-       
-       // ------------------------------------- Config Keys 
------------------------------------------
-       
-       private static final String FIELD_DELIMITER_PARAMETER = 
"recordinformat.delimiter.field";
-       
-       private static final String NUM_FIELDS_PARAMETER = 
"recordinformat.field.number";
-       
-       private static final String FIELD_TYPE_PARAMETER_PREFIX = 
"recordinformat.field.type_";
-       
-       private static final String TEXT_POSITION_PARAMETER_PREFIX = 
"recordinformat.text.position_";
-       
-       /**
-        * Creates a configuration builder that can be used to set the input 
format's parameters to the config in a fluent
-        * fashion.
-        * 
-        * @return A config builder for setting parameters.
-        */
-       public static ConfigBuilder configureRecordFormat(FileDataSource 
target) {
-               return new ConfigBuilder(target, target.getParameters());
-       }
-       
-       /**
-        * An abstract builder used to set parameters to the input format's 
configuration in a fluent way.
-        */
-       protected static class AbstractConfigBuilder<T> extends 
DelimitedInputFormat.AbstractConfigBuilder<T> {
-               
-               protected final RecordFormatCompilerHints hints;
-               
-               /**
-                * Creates a new builder for the given configuration.
-                *
-                * @param contract The contract from which the compiler hints 
are used.
-                *                 If contract is null, new compiler hints are 
generated.  
-                * @param config The configuration into which the parameters 
will be written.
-                */
-               protected AbstractConfigBuilder(Operator<?> contract, 
Configuration config) {
-                       super(config);
-                       
-                       if (contract != null) {
-                               this.hints = new 
RecordFormatCompilerHints(contract.getCompilerHints());
-                               
-                               // initialize with 2 bytes length for the 
header (its actually 3, but one is skipped on the first field
-                               this.hints.addWidthRecordFormat(2);
-                       }
-                       else {
-                               this.hints = new RecordFormatCompilerHints(new 
CompilerHints());
-                       }
-               }
-               
-               // 
--------------------------------------------------------------------
-               
-               /**
-                * Sets the delimiter that delimits the individual fields in 
the records textual input representation.
-                * 
-                * @param delimiter The character to be used as a field 
delimiter.
-                * @return The builder itself.
-                */
-               public T fieldDelimiter(char delimiter) {
-                       this.config.setString(FIELD_DELIMITER_PARAMETER, 
String.valueOf(delimiter));
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-               
-               public T field(Class<? extends Value> type, int textPosition) {
-                       return field(type, textPosition, 
Float.NEGATIVE_INFINITY);
-
-               }
-               
-               public T field(Class<? extends Value> type, int textPosition, 
float avgLen) {
-                       // register field
-                       final int numYet = 
this.config.getInteger(NUM_FIELDS_PARAMETER, 0);
-                       this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + 
numYet, type);
-                       this.config.setInteger(TEXT_POSITION_PARAMETER_PREFIX + 
numYet, textPosition);
-                       this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 
1);
-                       
-                       // register length
-                       if (avgLen == Float.NEGATIVE_INFINITY) {
-                               if (type == IntValue.class) {
-                                       avgLen = 4f;
-                               } else if (type == DoubleValue.class || type == 
LongValue.class) {
-                                       avgLen = 8f;
-                               }
-                       }
-                       
-                       if (avgLen != Float.NEGATIVE_INFINITY) {
-                               // add the len, plus one byte for the offset 
coding
-                               this.hints.addWidthRecordFormat(avgLen + 1);
-                       }
-                       
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-       }
-       
-       /**
-        * A builder used to set parameters to the input format's configuration 
in a fluent way.
-        */
-       public static class ConfigBuilder extends 
AbstractConfigBuilder<ConfigBuilder> {
-               
-               protected ConfigBuilder(Operator<?> target, Configuration 
targetConfig) {
-                       super(target, targetConfig);
-               }
-       }
-       
-       private static final class RecordFormatCompilerHints extends 
CompilerHints {
-               
-               private float width = 0.0f;
-               
-               private RecordFormatCompilerHints(CompilerHints parent) {
-                       copyFrom(parent);
-               }
-
-               @Override
-               public float getAvgOutputRecordSize() {
-                       float superWidth = super.getAvgOutputRecordSize();
-                       if (superWidth > 0.0f || this.width <= 0.0f) {
-                               return superWidth;
-                       } else {
-                               return this.width;
-                       }
-               }
-
-               private void addWidthRecordFormat(float width) {
-                       this.width += width;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
deleted file mode 100644
index a5d83c3..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-
-/**
- * This is an OutputFormat to serialize {@link Record}s to text. The output is
- * structured by record delimiters and field delimiters as common in CSV files.
- * Record delimiter separate records from each other ('\n' is common). Field
- * delimiters separate fields within a record. Record and field delimiters can
- * be configured using the CsvOutputFormat {@link Configuration}.
- * 
- * The number of fields to serialize must be configured as well. For each field
- * the type of the {@link Value} must be specified using the
- * {@link CsvOutputFormat#FIELD_TYPE_PARAMETER_PREFIX} config key and an index
- * running from 0 to the number of fields.
- * 
- * The position within the {@link Record} can be configured for each field 
using
- * the {@link CsvOutputFormat#RECORD_POSITION_PARAMETER_PREFIX} config key.
- * Either all {@link Record} positions must be configured or none. If none is
- * configured, the index of the config key is used.
- * 
- * @see Value
- * @see Configuration
- * @see Record
- */
-@SuppressWarnings("deprecation")
-public class CsvOutputFormat extends FileOutputFormat {
-       private static final long serialVersionUID = 1L;
-
-       public static final String RECORD_DELIMITER_PARAMETER = 
"output.record.delimiter";
-
-       private static final String RECORD_DELIMITER_ENCODING = 
"output.record.delimiter-encoding";
-
-       public static final String FIELD_DELIMITER_PARAMETER = 
"output.record.field-delimiter";
-
-       public static final String NUM_FIELDS_PARAMETER = 
"output.record.num-fields";
-
-       public static final String FIELD_TYPE_PARAMETER_PREFIX = 
"output.record.type_";
-
-       public static final String RECORD_POSITION_PARAMETER_PREFIX = 
"output.record.position_";
-
-       public static final String LENIENT_PARSING = "output.record.lenient";
-
-       @SuppressWarnings("unused")
-       private static final Logger LOG = 
LoggerFactory.getLogger(CsvOutputFormat.class);
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private int numFields;
-
-       private Class<? extends Value>[] classes;
-
-       private int[] recordPositions;
-
-       private Writer wrt;
-
-       private String fieldDelimiter;
-
-       private String recordDelimiter;
-
-       private String charsetName;
-
-       private boolean lenient;
-       
-       private boolean ctorInstantiation = false;
-
-       // 
--------------------------------------------------------------------------------------------
-       // Constructors and getters/setters for the configurable parameters
-       // 
--------------------------------------------------------------------------------------------
-
-       public CsvOutputFormat() {
-       }
-
-       /**
-        * Creates an instance of CsvOutputFormat. The position of the fields 
in the
-        * record is determined by the order in which the classes are given to 
this
-        * constructor. As the default value for separating records '\n' is 
used.
-        * The default field delimiter is ','.
-        * 
-        * @param types
-        *            The types of the fields that are in the record.
-        */
-       public CsvOutputFormat(Class<? extends Value>... types) {
-               this("\n", ",", types);
-       }
-
-       /**
-        * Creates an instance of CsvOutputFormat. The position of the fields 
in the
-        * record is determined by the order in which the classes are given to 
this
-        * constructor. As the default value for separating records '\n' is 
used.
-        * 
-        * @param fieldDelimiter
-        *            The delimiter that is used to separate the different 
fields in
-        *            the record.
-        * @param types
-        *            The types of the fields that are in the record.
-        */
-       public CsvOutputFormat(String fieldDelimiter, Class<? extends Value>... 
types) {
-               this("\n", fieldDelimiter, types);
-       }
-
-       /**
-        * Creates an instance of CsvOutputFormat. The position of the fields 
in the
-        * record is determined by the order in which the classes are given to 
this
-        * constructor.
-        * 
-        * @param recordDelimiter
-        *            The delimiter that is used to separate the different 
records.
-        * @param fieldDelimiter
-        *            The delimiter that is used to separate the different 
fields in
-        *            the record.
-        * @param types
-        *            The types of the fields that are in the record.
-        */
-       public CsvOutputFormat(String recordDelimiter, String fieldDelimiter, 
Class<? extends Value>... types) {
-               if (recordDelimiter == null) {
-                       throw new IllegalArgumentException("RecordDelmiter 
shall not be null.");
-               }
-               if (fieldDelimiter == null) {
-                       throw new IllegalArgumentException("FieldDelimiter 
shall not be null.");
-               }
-               if (types.length == 0) {
-                       throw new IllegalArgumentException("No field types 
given.");
-               }
-
-               this.fieldDelimiter = fieldDelimiter;
-               this.recordDelimiter = recordDelimiter;
-               this.lenient = false;
-
-               setTypes(types);
-               ctorInstantiation = true;
-       }
-       
-       public void setTypes(Class<? extends Value>... types) {
-               this.classes = types;
-               this.numFields = types.length;
-               this.recordPositions = new int[types.length];
-               for (int i = 0; i < types.length; i++) {
-                       if (types[i] == null) {
-                               throw new IllegalArgumentException("Invalid 
Constructor Parameter: No type class for parameter " + (2 * i));
-                       }
-                       this.recordPositions[i] = i;
-               }
-               
-               if (this.fieldDelimiter == null) {
-                       this.fieldDelimiter = ",";
-               }
-               
-               if (this.recordDelimiter == null) {
-                       this.recordDelimiter = "\n";
-               }
-       }
-       
-       public void setLenient(boolean lenient) {
-               this.lenient = lenient;
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-               super.configure(parameters);
-
-               int configNumFields = 
parameters.getInteger(NUM_FIELDS_PARAMETER, -1);
-               
-               if (ctorInstantiation) {
-                       if (configNumFields > 0) {
-                               throw new 
IllegalStateException("CsvOutputFormat instantiated via both parameters and 
config.");
-                       }                               
-                       return;                                                 
                        //already configured, no further actions required
-               }
-               
-               if (configNumFields < 1) {                      
-                       throw new IllegalStateException("CsvOutputFormat not 
configured via parameters or config.");                    
-               }
-               
-               this.numFields = configNumFields;
-
-               @SuppressWarnings("unchecked")
-               Class<Value>[] arr = new Class[this.numFields];
-               this.classes = arr;
-
-               try {
-                       ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-                       
-                       for (int i = 0; i < this.numFields; i++) {
-                               Class<? extends Value> clazz =  
parameters.<Value>getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl);
-                               if (clazz == null) {
-                                       throw new 
IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No 
type class for parameter " + i);
-                               }
-       
-                               this.classes[i] = clazz;
-                       }
-               }
-               catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Could not resolve type 
classes", e);
-               }
-
-               this.recordPositions = new int[this.numFields];
-               boolean anyRecordPosDefined = false;
-               boolean allRecordPosDefined = true;
-
-               for (int i = 0; i < this.numFields; i++) {
-                       int pos = 
parameters.getInteger(RECORD_POSITION_PARAMETER_PREFIX + i, Integer.MIN_VALUE);
-                       if (pos != Integer.MIN_VALUE) {
-                               anyRecordPosDefined = true;
-                               if (pos < 0) {
-                                       throw new 
IllegalArgumentException("Invalid configuration for CsvOutputFormat: "
-                                                       + "Invalid record 
position for parameter " + i);
-                               }
-                               this.recordPositions[i] = pos;
-                       } else {
-                               allRecordPosDefined = false;
-                               this.recordPositions[i] = i;
-                       }
-               }
-
-               if (anyRecordPosDefined && !allRecordPosDefined) {
-                       throw new IllegalArgumentException("Invalid 
configuration for CsvOutputFormat: "
-                                       + "Either none or all record positions 
must be defined.");
-               }
-
-               this.recordDelimiter = 
parameters.getString(RECORD_DELIMITER_PARAMETER, 
AbstractConfigBuilder.NEWLINE_DELIMITER);
-               if (this.recordDelimiter == null) {
-                       throw new IllegalArgumentException("The delimiter in 
the DelimitedOutputFormat must not be null.");
-               }
-               this.charsetName = 
parameters.getString(RECORD_DELIMITER_ENCODING, null);
-               this.fieldDelimiter = 
parameters.getString(FIELD_DELIMITER_PARAMETER, ",");
-               this.lenient = parameters.getBoolean(LENIENT_PARSING, false);
-       }
-
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException
-       {
-               super.open(taskNumber, numTasks);
-               this.wrt = this.charsetName == null ? new 
OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) :
-                               new OutputStreamWriter(new 
BufferedOutputStream(this.stream, 4096), this.charsetName);
-       }
-
-       @Override
-       public void close() throws IOException {
-               if (wrt != null) {
-                       this.wrt.close();
-               }
-               super.close();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void writeRecord(Record record) throws IOException {
-               int numRecFields = record.getNumFields();
-               int readPos;
-
-               for (int i = 0; i < this.numFields; i++) {
-                       readPos = this.recordPositions[i];
-                       if (readPos < numRecFields) {
-                               Value v = 
record.getField(this.recordPositions[i], this.classes[i]);
-                               if (v != null) {
-                                       if (i != 0) {
-                                               
this.wrt.write(this.fieldDelimiter);
-                                       }
-                                       this.wrt.write(v.toString());
-                               } else {
-                                       if (this.lenient) {
-                                               if (i != 0) {
-                                                       
this.wrt.write(this.fieldDelimiter);
-                                               }
-                                       } else {
-                                               throw new 
RuntimeException("Cannot serialize record with <null> value at position: " + 
readPos);
-                                       }
-                               }
-
-                       } else {
-                               if (this.lenient) {
-                                       if (i != 0) {
-                                               
this.wrt.write(this.fieldDelimiter);
-                                       }
-                               } else {
-                                       throw new RuntimeException("Cannot 
serialize record with out field at position: " + readPos);
-                               }
-                       }
-
-               }
-
-               // add the record delimiter
-               this.wrt.write(this.recordDelimiter);
-       }
-
-       // 
============================================================================================
-
-       /**
-        * Creates a configuration builder that can be used to set the input
-        * format's parameters to the config in a fluent fashion.
-        * 
-        * @return A config builder for setting parameters.
-        */
-       public static ConfigBuilder configureRecordFormat(FileDataSink target) {
-               return new ConfigBuilder(target.getParameters());
-       }
-
-       /**
-        * Abstract builder used to set parameters to the input format's
-        * configuration in a fluent way.
-        */
-       protected static abstract class AbstractConfigBuilder<T> extends 
FileOutputFormat.AbstractConfigBuilder<T> {
-               private static final String NEWLINE_DELIMITER = "\n";
-
-               // 
--------------------------------------------------------------------
-
-               /**
-                * Creates a new builder for the given configuration.
-                * 
-                * @param config
-                *            The configuration into which the parameters will 
be
-                *            written.
-                */
-               protected AbstractConfigBuilder(Configuration config) {
-                       super(config);
-               }
-
-               // 
--------------------------------------------------------------------
-
-               /**
-                * Sets the delimiter to be a single character, namely the 
given one.
-                * The character must be within the value range <code>0</code> 
to
-                * <code>127</code>.
-                * 
-                * @param delimiter
-                *            The delimiter character.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(char delimiter) {
-                       if (delimiter == '\n') {
-                               
this.config.setString(RECORD_DELIMITER_PARAMETER, NEWLINE_DELIMITER);
-                       } else {
-                               
this.config.setString(RECORD_DELIMITER_PARAMETER, String.valueOf(delimiter));
-                       }
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the delimiter to be the given string. The string will be
-                * converted to bytes for more efficient comparison during input
-                * parsing. The conversion will be done using the platforms 
default
-                * charset.
-                * 
-                * @param delimiter
-                *            The delimiter string.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(String delimiter) {
-                       this.config.setString(RECORD_DELIMITER_PARAMETER, 
delimiter);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the delimiter to be the given string. The string will be
-                * converted to bytes for more efficient comparison during input
-                * parsing. The conversion will be done using the charset with 
the given
-                * name. The charset must be available on the processing nodes,
-                * otherwise an exception will be raised at runtime.
-                * 
-                * @param delimiter
-                *            The delimiter string.
-                * @param charsetName
-                *            The name of the encoding character set.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(String delimiter, String charsetName) {
-                       this.config.setString(RECORD_DELIMITER_PARAMETER, 
delimiter);
-                       this.config.setString(RECORD_DELIMITER_ENCODING, 
charsetName);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the delimiter that delimits the individual fields in 
the records
-                * textual output representation.
-                * 
-                * @param delimiter
-                *            The character to be used as a field delimiter.
-                * @return The builder itself.
-                */
-               public T fieldDelimiter(char delimiter) {
-                       this.config.setString(FIELD_DELIMITER_PARAMETER, 
String.valueOf(delimiter));
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Adds a field of the record to be serialized to the output. 
The field
-                * at the given position will be interpreted as the type 
represented by
-                * the given class. The types {@link Object#toString()} method 
will be
-                * invoked to create a textual representation.
-                * 
-                * @param type
-                *            The type of the field.
-                * @param recordPosition
-                *            The position in the record.
-                * @return The builder itself.
-                */
-               public T field(Class<? extends Value> type, int recordPosition) 
{
-                       final int numYet = 
this.config.getInteger(NUM_FIELDS_PARAMETER, 0);
-                       this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + 
numYet, type);
-                       this.config.setInteger(RECORD_POSITION_PARAMETER_PREFIX 
+ numYet, recordPosition);
-                       this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 
1);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-
-               /**
-                * Sets the leniency for the serializer. A lenient serializer 
simply
-                * skips missing fields and null fields in the record, while a 
non
-                * lenient one throws an exception.
-                * 
-                * @param lenient
-                *            True, if the serializer should be lenient, false
-                *            otherwise.
-                * @return The builder itself.
-                */
-               public T lenient(boolean lenient) {
-                       this.config.setBoolean(LENIENT_PARSING, lenient);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-       }
-
-       /**
-        * A builder used to set parameters to the input format's configuration 
in a
-        * fluent way.
-        */
-       public static final class ConfigBuilder extends 
AbstractConfigBuilder<ConfigBuilder> {
-               /**
-                * Creates a new builder for the given configuration.
-                * 
-                * @param targetConfig
-                *            The configuration into which the parameters will 
be
-                *            written.
-                */
-               protected ConfigBuilder(Configuration targetConfig) {
-                       super(targetConfig);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
deleted file mode 100644
index e74ee49..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-/**
- * Base implementation for input formats that split the input at a delimiter 
into records.
- * The parsing of the record bytes into the record has to be implemented in the
- * {@link #readRecord(Record, byte[], int, int)} method.
- * <p>
- * The default delimiter is the newline character {@code '\n'}.
- */
-public abstract class DelimitedInputFormat extends 
org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
-       
-       private static final long serialVersionUID = -2297199268758915692L;
-
-       // 
--------------------------------------------------------------------------------------------
-       //  User-defined behavior
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * This function parses the given byte array which represents a 
serialized key/value
-        * pair. The parsed content is then returned by setting the pair 
variables. If the
-        * byte array contains invalid content the record can be skipped by 
returning <tt>false</tt>.
-        * 
-        * @param reuse The optional reusable holder for the line that is read.
-        * @param bytes The serialized record.
-        * @return returns whether the record was successfully deserialized
-        */
-       public abstract Record readRecord(Record reuse, byte[] bytes, int 
offset, int numBytes);
-}

Reply via email to