http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
deleted file mode 100644
index 49d9a2a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-
-/**
- * The base class for output formats that serialize their records into a 
delimited sequence.
- */
-@SuppressWarnings("deprecation")
-public abstract class DelimitedOutputFormat extends FileOutputFormat {
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * The configuration key for the entry that defines the record 
delimiter.
-        */
-       public static final String RECORD_DELIMITER = 
"pact.output.delimited.delimiter";
-
-       /**
-        * The configuration key to set the record delimiter encoding.
-        */
-       private static final String RECORD_DELIMITER_ENCODING = 
"pact.output.delimited.delimiter-encoding";
-       
-       /**
-        * The configuration key for the entry that defines the write-buffer 
size.
-        */
-       public static final String WRITE_BUFFER_SIZE = 
"pact.output.delimited.buffersize";
-       
-       /**
-        * The default write-buffer size. 64 KiByte. 
-        */
-       private static final int DEFAULT_WRITE_BUFFER_SIZE = 64 * 1024;
-       
-       /**
-        * The minimal write-buffer size, 1 KiByte.
-        */
-       private static final int MIN_WRITE_BUFFER_SIZE = 1024;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       private byte[] delimiter;
-       
-       private byte[] buffer;
-       
-       private byte[] targetArray = new byte[64];
-       
-       private int pos;
-       
-       private int bufferSize;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       
-       /**
-        * Calls the super classes to configure themselves and reads the config 
parameters for the delimiter and
-        * the write buffer size.
-        * 
-        *  @param config The configuration to read the parameters from.
-        *  
-        * @see 
org.apache.flink.api.java.record.io.FileOutputFormat#configure(org.apache.flink.configuration.Configuration)
-        */
-       public void configure(Configuration config)
-       {
-               super.configure(config);
-               
-               final String delim = config.getString(RECORD_DELIMITER, "\n");
-               final String charsetName = 
config.getString(RECORD_DELIMITER_ENCODING, null);           
-               if (delim == null) {
-                       throw new IllegalArgumentException("The delimiter in 
the DelimitedOutputFormat must not be null.");
-               }
-               try {
-                       this.delimiter = charsetName == null ? delim.getBytes() 
: delim.getBytes(charsetName);
-               } catch (UnsupportedEncodingException useex) {
-                       throw new IllegalArgumentException("The charset with 
the name '" + charsetName + 
-                               "' is not supported on this TaskManager 
instance.", useex);
-               }
-               
-               this.bufferSize = config.getInteger(WRITE_BUFFER_SIZE, 
DEFAULT_WRITE_BUFFER_SIZE);
-               if (this.bufferSize < MIN_WRITE_BUFFER_SIZE) {
-                       throw new IllegalArgumentException("The write buffer 
size must not be less than " + MIN_WRITE_BUFFER_SIZE
-                               + " bytes.");
-               }
-       }
-       
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException
-       {
-               super.open(taskNumber, numTasks);
-               
-               if (this.buffer == null) {
-                       this.buffer = new byte[this.bufferSize];
-               }
-               if (this.targetArray == null) {
-                       this.targetArray = new byte[64];
-               }
-               this.pos = 0;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void close() throws IOException {
-               if (this.stream != null) {
-                       this.stream.write(this.buffer, 0, this.pos);
-               }
-               
-               // close file stream
-               super.close();
-       }
-
-       /**
-        * This method is called for every record so serialize itself into the 
given target array. The method should
-        * return the number of bytes occupied in the target array. If the 
target array is not large enough, a negative 
-        * value should be returned.
-        * <p>
-        * The absolute value of the returned integer can be given as a hint 
how large an array is required. The array is
-        * resized to the return value's absolute value, if that is larger than 
the current array size. Otherwise, the
-        * array size is simply doubled.
-        * 
-        * @param rec The record to be serialized.
-        * @param target The array to serialize the record into.
-        * @return The length of the serialized contents, or a negative value, 
indicating that the array is too small.
-        * 
-        * @throws Exception If the user code produces an exception that 
prevents processing the record, it should
-        *                   throw it such that the engine recognizes the 
situation as a fault.
-        */
-       public abstract int serializeRecord(Record rec, byte[] target) throws 
Exception;
-       
-       
-
-       @Override
-       public void writeRecord(Record record) throws IOException
-       {
-               int size;
-               try {
-                       while ((size = serializeRecord(record, 
this.targetArray)) < 0) {
-                               if (-size > this.targetArray.length) {
-                                       this.targetArray = new byte[-size];
-                               }
-                               else {
-                                       this.targetArray = new 
byte[this.targetArray.length * 2];
-                               }
-                       }
-               }
-               catch (Exception ex) {
-                       throw new IOException("Error while serializing the 
record to bytes: " + ex.getMessage(), ex);
-               }
-               
-               if (this.bufferSize - this.pos > size + this.delimiter.length) {
-                       System.arraycopy(this.targetArray, 0, this.buffer, 
this.pos, size);
-                       System.arraycopy(this.delimiter, 0, this.buffer, pos + 
size, this.delimiter.length);
-                       pos += size + this.delimiter.length;
-               }
-               else {
-                       // copy the target array (piecewise)
-                       int off = 0;
-                       while (off < size) {
-                               int toCopy = Math.min(size - off, 
this.bufferSize - this.pos);
-                               System.arraycopy(this.targetArray, off, 
this.buffer, this.pos, toCopy);
-
-                               off += toCopy;
-                               this.pos += toCopy;
-                               if (this.pos == this.bufferSize) {
-                                       this.pos = 0;
-                                       this.stream.write(this.buffer, 0, 
this.bufferSize);
-                               }
-                       }
-                       
-                       // copy the delimiter (piecewise)
-                       off = 0;
-                       while (off < this.delimiter.length) {
-                               int toCopy = Math.min(this.delimiter.length - 
off, this.bufferSize - this.pos);
-                               System.arraycopy(this.delimiter, off, 
this.buffer, this.pos, toCopy);
-
-                               off += toCopy;
-                               this.pos += toCopy;
-                               if (this.pos == this.bufferSize) {
-                                       this.pos = 0;
-                                       this.stream.write(this.buffer, 0, 
this.bufferSize);
-                               }
-                       }
-               }
-       }
-
-       // 
============================================================================================
-       
-       /**
-        * Creates a configuration builder that can be used to set the input 
format's parameters to the config in a fluent
-        * fashion.
-        * 
-        * @return A config builder for setting parameters.
-        */
-       public static ConfigBuilder configureDelimitedFormat(FileDataSink 
target) {
-               return new ConfigBuilder(target.getParameters());
-       }
-       
-       /**
-        * A builder used to set parameters to the input format's configuration 
in a fluent way.
-        */
-       protected static abstract class AbstractConfigBuilder<T> extends 
FileOutputFormat.AbstractConfigBuilder<T>
-       {
-               private static final String NEWLINE_DELIMITER = "\n";
-               
-               // 
--------------------------------------------------------------------
-               
-               /**
-                * Creates a new builder for the given configuration.
-                * 
-                * @param config The configuration into which the parameters 
will be written.
-                */
-               protected AbstractConfigBuilder(Configuration config) {
-                       super(config);
-               }
-               
-               // 
--------------------------------------------------------------------
-               
-               /**
-                * Sets the delimiter to be a single character, namely the 
given one. The character must be within
-                * the value range <code>0</code> to <code>127</code>.
-                * 
-                * @param delimiter The delimiter character.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(char delimiter) {
-                       if (delimiter == '\n') {
-                               this.config.setString(RECORD_DELIMITER, 
NEWLINE_DELIMITER);
-                       } else {
-                               this.config.setString(RECORD_DELIMITER, 
String.valueOf(delimiter));
-                       }
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-               
-               /**
-                * Sets the delimiter to be the given string. The string will 
be converted to bytes for more efficient
-                * comparison during input parsing. The conversion will be done 
using the platforms default charset.
-                * 
-                * @param delimiter The delimiter string.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(String delimiter) {
-                       this.config.setString(RECORD_DELIMITER, delimiter);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-               
-               /**
-                * Sets the delimiter to be the given string. The string will 
be converted to bytes for more efficient
-                * comparison during input parsing. The conversion will be done 
using the charset with the given name.
-                * The charset must be available on the processing nodes, 
otherwise an exception will be raised at
-                * runtime.
-                * 
-                * @param delimiter The delimiter string.
-                * @param charsetName The name of the encoding character set.
-                * @return The builder itself.
-                */
-               public T recordDelimiter(String delimiter, String charsetName) {
-                       this.config.setString(RECORD_DELIMITER, delimiter);
-                       this.config.setString(RECORD_DELIMITER_ENCODING, 
charsetName);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-               
-               /**
-                * Sets the size of the write buffer.
-                * 
-                * @param sizeInBytes The size of the write buffer in bytes.
-                * @return The builder itself.
-                */
-               public T writeBufferSize(int sizeInBytes) {
-                       this.config.setInteger(WRITE_BUFFER_SIZE, sizeInBytes);
-                       @SuppressWarnings("unchecked")
-                       T ret = (T) this;
-                       return ret;
-               }
-       }
-       
-       /**
-        * A builder used to set parameters to the input format's configuration 
in a fluent way.
-        */
-       public static class ConfigBuilder extends 
AbstractConfigBuilder<ConfigBuilder>
-       {
-               /**
-                * Creates a new builder for the given configuration.
-                * 
-                * @param targetConfig The configuration into which the 
parameters will be written.
-                */
-               protected ConfigBuilder(Configuration targetConfig) {
-                       super(targetConfig);
-               }
-               
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
deleted file mode 100644
index d448c6a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.Record;
-
-/**
- * This input format starts an external process and reads its input from the 
standard out (stdout) of the started process.
- * The input is split into fixed-sized segments from which a {@link Record} is 
generated. 
- * The external process is started outside of the JVM via a provided start 
command and can be an arbitrary program, 
- * e.g., a data generator or a shell script. The input format checks the exit 
code of the process 
- * to validate whether the process terminated correctly. A list of allowed 
exit codes can be provided.
- * The input format requires ({@link ExternalProcessInputSplit} objects that 
hold the command to execute.
- * 
- * <b>Warning:</b> This format does not consume the standard error stream 
(stderr) of the started process. This might cause deadlocks. 
- *
- * @param <T> The type of the input split (must extend 
ExternalProcessInputSplit)
- */
-public abstract class ExternalProcessFixedLengthInputFormat<T extends 
ExternalProcessInputSplit> extends ExternalProcessInputFormat<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * The config parameter which defines the fixed length of a record.
-        */
-       public static final String RECORDLENGTH_PARAMETER_KEY = 
"pact.input.recordLength";
-       
-       /**
-        * The default read buffer size = 1MB.
-        */
-       private static final int DEFAULT_TARGET_READ_BUFFER_SIZE = 1024 * 1024;
-       
-       /**
-        * Buffer to read a batch of records from a file 
-        */
-       private byte[] readBuffer;
-       
-       /**
-        * read position within the read buffer
-        */
-       private int readBufferReadPos;
-       
-       /**
-        * fill marker within the read buffer 
-        */
-       private int readBufferFillPos;
-       
-       /**
-        * remaining space within the read buffer
-        */
-       private int readBufferRemainSpace;
-       
-       /**
-        * target size of the read buffer
-        */
-       private int targetReadBufferSize = DEFAULT_TARGET_READ_BUFFER_SIZE;
-       
-       /**
-        * fixed length of all records
-        */
-       protected int recordLength;
-       
-       /**
-        * Flags to indicate the end of the split
-        */
-       private boolean noMoreStreamInput;
-       private boolean noMoreRecordBuffers;
-       
-       /**
-        * Reads a record out of the given buffer. This operation always 
consumes the standard number of
-        * bytes, regardless of whether the produced record was valid.
-        * 
-        * @param target The target Record
-        * @param buffer The buffer containing the binary data.
-        * @param startPos The start position in the byte array.
-        * @return True, is the record is valid, false otherwise.
-        */
-       public abstract boolean readBytes(Record target, byte[] buffer, int 
startPos);
-       
-
-       @Override
-       public void configure(Configuration parameters)
-       {
-               // configure parent
-               super.configure(parameters);
-               
-               // read own parameters
-               this.recordLength = 
parameters.getInteger(RECORDLENGTH_PARAMETER_KEY, 0);
-               if (recordLength < 1) {
-                       throw new IllegalArgumentException("The record length 
parameter must be set and larger than 0.");
-               }
-               
-       }
-       
-       /**
-        * Sets the target size of the buffer to be used to read from the 
stdout stream. 
-        * The actual size depends on the record length since it is chosen such 
that records are not split.
-        * This method has only an effect, if it is called before the input 
format is opened.
-        * 
-        * @param targetReadBufferSize The target size of the read buffer.
-        */
-       public void setTargetReadBufferSize(int targetReadBufferSize)
-       {
-               this.targetReadBufferSize = targetReadBufferSize;
-       }
-       
-
-       @Override
-       public void open(GenericInputSplit split) throws IOException {
-               
-               super.open(split);
-               
-               // compute readBufferSize
-               if(recordLength > this.targetReadBufferSize) {
-                       // read buffer is at least as big as record
-                       this.readBuffer = new byte[recordLength];
-               } else if (this.targetReadBufferSize % recordLength == 0) {
-                       // target read buffer size is a multiple of record 
length, so it's ok
-                       this.readBuffer = new byte[this.targetReadBufferSize];
-               } else {
-                       // extent default read buffer size such that records 
are not split
-                       this.readBuffer = new byte[(recordLength - 
(this.targetReadBufferSize % recordLength)) + this.targetReadBufferSize];
-               }
-               
-               // initialize read buffer positions
-               this.readBufferReadPos = 0;
-               this.readBufferFillPos = 0;
-               this.readBufferRemainSpace = readBuffer.length;
-               // initialize end flags
-               this.noMoreStreamInput = false;
-               this.noMoreRecordBuffers = false;
-               
-       }
-       
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return noMoreRecordBuffers;
-       }
-       
-
-       @Override
-       public Record nextRecord(Record reuse) throws IOException {
-               // check if read buffer must be filled (less than one record 
contained)
-               if(this.readBufferFillPos - this.readBufferReadPos < 
this.recordLength) {
-                       // try to fill read buffer
-                       if(!this.fillReadBuffer()) {
-                               return null;
-                       }
-               }
-
-               // update read buffer read marker
-               this.readBufferReadPos += this.recordLength;
-               
-               return this.readBytes(reuse, readBuffer, 
(this.readBufferReadPos-this.recordLength)) ? reuse : null;
-               
-       }
-
-       /**
-        * Fills the read buffer by reading from the stdout stream of the 
external process.
-        * WARNING: We do not read from the error stream. This might cause a 
deadlock.
-        *  
-        * @return true if new content was filled into the buffer, false 
otherwise.
-        * @throws IOException
-        */
-       private boolean fillReadBuffer() throws IOException {
-               // TODO: Add reading from error stream of external process. 
Otherwise the InputFormat might get deadlocked!
-               
-               // stream was completely processed
-               if(noMoreStreamInput) {
-                       if(this.readBufferReadPos == this.readBufferFillPos) {
-                               this.noMoreRecordBuffers = true;
-                               return false;
-                       } else {
-                               throw new RuntimeException("External process 
produced incomplete record");
-                       }
-               }
-               
-               // the buffer was completely filled and processed
-               if(this.readBufferReadPos == this.readBuffer.length && 
-                               this.readBufferRemainSpace == 0) {
-                       // reset counters and fill again
-                       this.readBufferFillPos = 0;
-                       this.readBufferRemainSpace = this.readBuffer.length;
-                       this.readBufferReadPos = 0;
-               }
-               
-               // as long as not at least one record is complete
-               while(this.readBufferFillPos - this.readBufferReadPos < 
this.recordLength) {
-                       // read from stdout
-                       int readCnt = 
super.extProcOutStream.read(this.readBuffer, this.readBufferFillPos, 
this.readBufferRemainSpace);
-                       
-                       if(readCnt == -1) {
-                               // the is nothing more to read
-                               this.noMoreStreamInput = true;
-                               return false;
-                       } else {
-                               // update fill position and remain cnt
-                               this.readBufferFillPos += readCnt;
-                               this.readBufferRemainSpace -= readCnt;
-                       }
-               }
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
deleted file mode 100644
index cbf16b5..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.StringTokenizer;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-
-/**
- * This input format starts an external process and reads its input from the 
standard out (stdout) of the started process.
- * The process is started outside of the JVM via a provided start command and 
can be an arbitrary program, e.g., a data generator or a shell script.
- * The input format checks the exit code of the process to validate whether 
the process terminated correctly. A list of allowed exit codes can be provided.
- * The input format requires ({@link ExternalProcessInputSplit} objects that 
hold the command to execute.
- * 
- * <b>Attention! </b><br>
- * You must take care to read from (and process) both output streams of the 
process, standard out (stdout) and standard error (stderr). 
- * Otherwise, the input format might get deadlocked! 
- * 
- *
- * @param <T> The type of the input split (must extend 
ExternalProcessInputSplit)
- */
-public abstract class ExternalProcessInputFormat<T extends 
ExternalProcessInputSplit> extends GenericInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * The config parameter lists (comma separated) all allowed exit codes
-        */
-       public static final String ALLOWEDEXITCODES_PARAMETER_KEY = 
"pact.input.externalProcess.allowedExitCodes";
-       
-       /**
-        * The external process
-        */
-       private Process extProc;
-       
-       /**
-        * The stdout stream of the external process
-        */
-       protected InputStream extProcOutStream;
-       
-       /**
-        * The stderr stream of the external process
-        */
-       protected InputStream extProcErrStream;
-       
-       /**
-        * Array of allowed exit codes
-        */
-       protected int[] allowedExitCodes;
-       
-
-       @Override
-       public void configure(Configuration parameters) {
-               // get allowed exit codes
-               String allowedExitCodesList = 
parameters.getString(ALLOWEDEXITCODES_PARAMETER_KEY, "0");
-               
-               // parse allowed exit codes
-               StringTokenizer st = new StringTokenizer(allowedExitCodesList, 
",");
-               this.allowedExitCodes = new int[st.countTokens()];
-               
-               for(int i=0; i<this.allowedExitCodes.length; i++) {
-                       this.allowedExitCodes[i] = 
Integer.parseInt(st.nextToken().trim());
-               }
-               
-       }
-       
-       @Override
-       public void close() throws IOException {
-               try {
-                       // get exit code
-                       int exitCode = this.extProc.exitValue();
-                       // check whether exit code is allowed
-                       boolean exitCodeOk = false;
-                       for (int allowedExitCode : this.allowedExitCodes) {
-                               if (allowedExitCode == exitCode) {
-                                       exitCodeOk = true;
-                                       break;
-                               }
-                       }
-                       if(!exitCodeOk) {
-                               // external process did not finish with an 
allowed exit code
-                               throw new RuntimeException("External process 
did not finish with an allowed exit code: "+exitCode);
-                       }
-               } catch(IllegalThreadStateException itse) {
-                       // process did not terminate yet, shut it down!
-                       this.extProc.destroy();
-                       if(!this.reachedEnd()) {
-                               throw new RuntimeException("External process 
was destroyed although stream was not fully read.");
-                       }
-               } finally {
-                       this.extProcErrStream.close();
-                       this.extProcOutStream.close();
-               }
-       }
-
-       @Override
-       public void open(GenericInputSplit split) throws IOException {
-               
-               if(!(split instanceof ExternalProcessInputSplit)) {
-                       throw new IOException("Invalid InputSplit type.");
-               }
-               
-               ExternalProcessInputSplit epSplit = 
(ExternalProcessInputSplit)split;           
-               
-               // check if process command is valid string
-               if(epSplit.getExternalProcessCommand() != null && 
!epSplit.getExternalProcessCommand().equals("")) {
-                       try {
-                               // run the external process
-                               this.extProc = 
Runtime.getRuntime().exec(epSplit.getExternalProcessCommand());
-                       } catch (IOException e) {
-                               throw new IOException("IO Exception when 
starting external process: "+epSplit.getExternalProcessCommand());
-                       }
-                       // connect streams to stdout and stderr
-                       this.extProcOutStream = this.extProc.getInputStream();
-                       this.extProcErrStream = this.extProc.getErrorStream();
-               } else {
-                       throw new IllegalArgumentException("External Process 
Command not set");
-               }
-       }
-       
-       public void waitForProcessToFinish() throws InterruptedException {
-               extProc.waitFor();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
deleted file mode 100644
index e087cb1..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.core.io.GenericInputSplit;
-
-/**
- * The ExternalProcessInputSplit contains all information for {@link 
org.apache.flink.api.common.io.InputFormat}
- * that read their data from external processes.
- * Each parallel instance of an InputFormat starts an external process and 
reads its output.
- * The command to start the external process must be executable on all nodes.
- * 
- * @see ExternalProcessInputFormat
- * @see ExternalProcessFixedLengthInputFormat
- */
-public class ExternalProcessInputSplit extends GenericInputSplit {
-
-       private static final long serialVersionUID = 1L;
-       
-       // command to be executed for this input split
-       private final String extProcessCommand;
-       
-       /**
-        * Instantiates an ExternalProcessInputSplit
-        * 
-        * @param splitNumber The number of the input split
-        * @param extProcCommand The command to be executed for the input split
-        */
-       public ExternalProcessInputSplit(int splitNumber, int numSplits, String 
extProcCommand) {
-               super(splitNumber, numSplits);
-               this.extProcessCommand = extProcCommand;
-       }
-       
-       /**
-        * Returns the command to be executed to derive the input for this split
-        * 
-        * @return the command to be executed to derive the input for this split
-        */
-       public String getExternalProcessCommand() {
-               return this.extProcessCommand;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
deleted file mode 100644
index c73da71..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-/**
- * The base interface for input formats that read {@link Record}s from a
- * file.
- */
-public abstract class FileInputFormat extends 
org.apache.flink.api.common.io.FileInputFormat<Record> {
-       
-       private static final long serialVersionUID = -8819984594406641418L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
deleted file mode 100644
index c9591ee..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-
-/**
- * The abstract base class for all output formats that are file based. 
Contains the logic to open/close the target
- * file streams.
- */
-public abstract class FileOutputFormat extends 
org.apache.flink.api.common.io.FileOutputFormat<Record> {
-       
-       private static final long serialVersionUID = 3832934435044920834L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
deleted file mode 100644
index e544eb9..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-
-/**
- * 
- */
-public abstract class FixedLengthInputFormat extends FileInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       /**
-        * The config parameter which defines the fixed length of a record.
-        */
-       public static final String RECORDLENGTH_PARAMETER_KEY = 
"pact.fix-input.record-length";
-       
-       /**
-        * The default read buffer size = 1MB.
-        */
-       private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
-       
-       /**
-        * Buffer to read a batch of records from a file 
-        */
-       private byte[] readBuffer;
-       
-       /**
-        * The position in the stream
-        */
-       private long streamPos;
-       
-       /**
-        * The end position in the stream.
-        */
-       private long streamEnd;
-       
-       /**
-        * read position within the read buffer
-        */
-       private int readBufferPos;
-       
-       /**
-        * The limit of the data in the read buffer.
-        */
-       private int readBufferLimit;
-       
-       /**
-        * fixed length of all records
-        */
-       private int recordLength;
-       
-       /**
-        * size of the read buffer
-        */
-       private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
-       
-       /**
-        * The flag whether the stream is exhausted.
-        */
-       private boolean exhausted;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Constructor only sets the key and value classes
-        */
-       protected FixedLengthInputFormat() {}
-       
-       /**
-        * Reads a record out of the given buffer. This operation always 
consumes the standard number of
-        * bytes, regardless of whether the produced record was valid.
-        * 
-        * @param target The target Record
-        * @param buffer The buffer containing the binary data.
-        * @param startPos The start position in the byte array.
-        * @return True, is the record is valid, false otherwise.
-        */
-       public abstract boolean readBytes(Record target, byte[] buffer, int 
startPos);
-       
-       /**
-        * Returns the fixed length of a record.
-        * 
-        * @return the fixed length of a record.
-        */
-       public int getRecordLength() {
-               return this.recordLength;
-       }
-       
-       /**
-        * Gets the size of the buffer internally used to parse record 
boundaries.
-        * 
-        * @return The size of the parsing buffer.
-        */
-       public int getReadBufferSize() {
-               return this.readBuffer.length;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public void configure(Configuration parameters) {
-               // pass parameters to FileInputFormat
-               super.configure(parameters);
-
-               // read own parameters
-               this.recordLength = 
parameters.getInteger(RECORDLENGTH_PARAMETER_KEY, 0);
-               if (recordLength < 1) {
-                       throw new IllegalArgumentException("The record length 
parameter must be set and larger than 0.");
-               }
-       }
-       
-       @Override
-       public void open(FileInputSplit split) throws IOException {
-               // open input split using FileInputFormat
-               super.open(split);
-               
-               // adjust the stream positions for boundary splits
-               int recordOffset = (int) (this.splitStart % this.recordLength);
-               if(recordOffset != 0) {
-                       // move start to next boundary
-                       super.stream.seek(this.splitStart + recordOffset);
-               }
-               this.streamPos = this.splitStart + recordOffset;
-               this.streamEnd = this.splitStart + this.splitLength;
-               this.streamEnd += this.streamEnd % this.recordLength;
-               
-               // adjust readBufferSize
-               this.readBufferSize += this.recordLength - (this.readBufferSize 
% this.recordLength);
-               
-               if (this.readBuffer == null || this.readBuffer.length != 
this.readBufferSize) {
-                       this.readBuffer = new byte[this.readBufferSize];
-               }
-               
-               this.readBufferLimit = 0;
-               this.readBufferPos = 0;
-               this.exhausted = false;
-               fillReadBuffer();
-       }
-
-       /**
-        * {@inheritDoc}
-        * @throws IOException 
-        */
-       @Override
-       public FileBaseStatistics getStatistics(BaseStatistics cachedStats) 
throws IOException {
-               final FileBaseStatistics stats = 
super.getStatistics(cachedStats);
-               return stats == null ? null : 
-                       new FileBaseStatistics(stats.getLastModificationTime(), 
stats.getTotalInputSize(), this.recordLength);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-
-       @Override
-       public boolean reachedEnd() {
-               return this.exhausted;
-       }
-       
-
-       @Override
-       public Record nextRecord(Record reuse) throws IOException {
-               // check if read buffer contains another full record
-               if (this.readBufferLimit - this.readBufferPos <= 0) {
-                       // get another buffer
-                       fillReadBuffer();
-                       // check if source is exhausted
-                       if (this.exhausted) {
-                               return null;
-                       }
-               }
-               else if (this.readBufferLimit - this.readBufferPos < 
this.recordLength) {
-                       throw new IOException("Unable to read full record");
-               }
-               
-               boolean val = readBytes(reuse, this.readBuffer, 
this.readBufferPos);
-               
-               this.readBufferPos += this.recordLength;
-               if (this.readBufferPos >= this.readBufferLimit) {
-                       fillReadBuffer();
-               }
-               return val ? reuse : null;
-       }
-       
-       /**
-        * Fills the next read buffer from the file stream.
-        * 
-        * @throws IOException
-        */
-       private void fillReadBuffer() throws IOException {
-               // special case for compressed files.
-               if(splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
-                       int read = this.stream.read(this.readBuffer, 0, 
this.readBufferSize);
-                       if (read == -1) {
-                               exhausted = true;
-                       } else {
-                               this.streamPos += read;
-                               this.readBufferPos = 0;
-                               this.readBufferLimit = read;
-                       }
-                       return;
-               }
-               
-               int toRead = (int) Math.min(this.streamEnd - this.streamPos, 
this.readBufferSize);
-               if (toRead <= 0) {
-                       this.exhausted = true;
-                       return;
-               }
-               
-               // fill read buffer
-               int read = this.stream.read(this.readBuffer, 0, toRead);
-               
-               if (read <= 0) {
-                       this.exhausted = true;
-               } else {
-                       this.streamPos += read;
-                       this.readBufferPos = 0;
-                       this.readBufferLimit = read;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
deleted file mode 100644
index 46fa6e5..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-/**
- * Generic base class for all inputs that are not based on files, specific to 
Record.
- */
-public abstract class GenericInputFormat extends 
org.apache.flink.api.common.io.GenericInputFormat<Record> {
-       private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
deleted file mode 100644
index c606458..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.util.Arrays;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-
-/**
- * Base implementation for an input format that returns each line as a 
separate record that contains
- * only a single string, namely the line.
- */
-public class TextInputFormat extends DelimitedInputFormat {
-       private static final long serialVersionUID = 1L;
-       
-       public static final String CHARSET_NAME = "textformat.charset";
-       
-       public static final String FIELD_POS = "textformat.pos";
-       
-       public static final String DEFAULT_CHARSET_NAME = "UTF-8";
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(TextInputFormat.class);
-       
-       
-       protected final StringValue theString = new StringValue();
-       
-       
-       // all fields below here are set in configure / open, so we do not 
serialze them
-       
-       protected transient CharsetDecoder decoder;
-       
-       protected transient ByteBuffer byteWrapper;
-       
-       protected transient int pos;
-       
-       protected transient boolean ascii;
-       
-       /**
-        * Code of \r, used to remove \r from a line when the line ends with 
\r\n
-        */
-       private static final byte CARRIAGE_RETURN = (byte) '\r';
-
-       /**
-        * Code of \n, used to identify if \n is used as delimiter
-        */
-       private static final byte NEW_LINE = (byte) '\n';
-
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public void configure(Configuration parameters) {
-               super.configure(parameters);
-               
-               // get the charset for the decoding
-               String charsetName = parameters.getString(CHARSET_NAME, 
DEFAULT_CHARSET_NAME);
-               if (charsetName == null || !Charset.isSupported(charsetName)) {
-                       throw new RuntimeException("Unsupported charset: " + 
charsetName);
-               }
-               
-               if (charsetName.equals("ISO-8859-1") || 
charsetName.equalsIgnoreCase("ASCII")) {
-                       this.ascii = true;
-               } else {
-                       this.decoder = 
Charset.forName(charsetName).newDecoder();
-                       this.byteWrapper = ByteBuffer.allocate(1);
-               }
-               
-               // get the field position to write in the record
-               this.pos = parameters.getInteger(FIELD_POS, 0);
-               if (this.pos < 0) {
-                       throw new RuntimeException("Illegal configuration value 
for the target position: " + this.pos);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       public Record readRecord(Record reuse, byte[] bytes, int offset, int 
numBytes) {
-               StringValue str = this.theString;
-               
-               //Check if \n is used as delimiter and the end of this line is 
a \r, then remove \r from the line
-               if (this.getDelimiter() != null && this.getDelimiter().length 
== 1 
-                               && this.getDelimiter()[0] == NEW_LINE && 
offset+numBytes >= 1 
-                               && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
-                       numBytes -= 1;
-               }
-
-               
-               if (this.ascii) {
-                       str.setValueAscii(bytes, offset, numBytes);
-               }
-               else {
-                       ByteBuffer byteWrapper = this.byteWrapper;
-                       if (bytes != byteWrapper.array()) {
-                               byteWrapper = ByteBuffer.wrap(bytes, 0, 
bytes.length);
-                               this.byteWrapper = byteWrapper;
-                       }
-                       byteWrapper.limit(offset + numBytes);
-                       byteWrapper.position(offset);
-                               
-                       try {
-                               CharBuffer result = 
this.decoder.decode(byteWrapper);
-                               str.setValue(result);
-                       }
-                       catch (CharacterCodingException e) {
-                               byte[] copy = new byte[numBytes];
-                               System.arraycopy(bytes, offset, copy, 0, 
numBytes);
-                               LOG.warn("Line could not be encoded: " + 
Arrays.toString(copy), e);
-                               return null;
-                       }
-               }
-               
-               reuse.clear();
-               reuse.setField(this.pos, str);
-               return reuse;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
deleted file mode 100644
index 29dab6a..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.types.Record;
-
-/**
- *  * <b>NOTE: The Record API is marked as deprecated. It is not being 
developed anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- */
-@Deprecated
-public class BulkIteration extends BulkIterationBase<Record> {
-       public BulkIteration() {
-               super(OperatorInfoHelper.unary());
-       }
-
-       public BulkIteration(String name) {
-               super(OperatorInfoHelper.unary(), name);
-       }
-
-       /**
-        * Specialized operator to use as a recognizable place-holder for the 
input to the
-        * step function when composing the nested data flow.
-        */
-       public static class PartialSolutionPlaceHolder extends 
BulkIterationBase.PartialSolutionPlaceHolder<Record> {
-               public PartialSolutionPlaceHolder(BulkIterationBase<Record> 
container) {
-                       super(container, OperatorInfoHelper.unary());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
deleted file mode 100644
index 09811d6..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.operators;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.google.common.base.Preconditions;
-
-/**
- * CoGroupOperator that applies a {@link CoGroupFunction} to groups of records 
sharing
- * the same key (one group per input).
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * @see CoGroupFunction
- */
-@Deprecated
-public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, 
Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, 
Record>> implements RecordOperator {
-       
-       /**
-        * The types of the keys that the operator groups on.
-        */
-       private final Class<? extends Key<?>>[] keyTypes;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Creates a Builder with the provided {@link CoGroupFunction} 
implementation.
-        * 
-        * @param udf The {@link CoGroupFunction} implementation for this 
CoGroup operator.
-        * @param keyClass The class of the key data type.
-        * @param keyColumn1 The position of the key in the first input's 
records.
-        * @param keyColumn2 The position of the key in the second input's 
records.
-        */
-       public static Builder builder(CoGroupFunction udf, Class<? extends 
Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-               WrappingCoGroupFunction wrapper = new 
WrappingCoGroupFunction(udf);
-               return new Builder(new 
UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
-       }
-       
-       /**
-        * Creates a Builder with the provided {@link CoGroupFunction} 
implementation.
-        * 
-        * @param udf The {@link CoGroupFunction} implementation for this 
CoGroup operator.
-        * @param keyClass The class of the key data type.
-        * @param keyColumn1 The position of the key in the first input's 
records.
-        * @param keyColumn2 The position of the key in the second input's 
records.
-        */
-       public static Builder builder(Class<? extends CoGroupFunction> udf, 
Class<? extends Key<?>> keyClass,
-                       int keyColumn1, int keyColumn2)
-       {
-               WrappingCoGroupFunction wrapper = new 
WrappingClassCoGroupFunction(udf);
-               return new Builder(new 
UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
-       }
-       
-       /**
-        * The private constructor that only gets invoked from the Builder.
-        * @param builder
-        */
-       protected CoGroupOperator(Builder builder) {
-               super(builder.udf, OperatorInfoHelper.binary(), 
builder.getKeyColumnsArray1(), builder.getKeyColumnsArray2(), builder.name);
-               this.keyTypes = builder.getKeyClassesArray();
-               
-               if (builder.inputs1 != null && !builder.inputs1.isEmpty()) {
-                       
setFirstInput(Operator.createUnionCascade(builder.inputs1));
-               }
-               if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
-                       
setSecondInput(Operator.createUnionCascade(builder.inputs2));
-               }
-
-               // sanity check solution set key mismatches
-               if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
-                       int[] positions = getKeyColumns(0);
-                       ((DeltaIteration.SolutionSetPlaceHolder) 
input1).checkJoinKeyFields(positions);
-               }
-               if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
-                       int[] positions = getKeyColumns(1);
-                       ((DeltaIteration.SolutionSetPlaceHolder) 
input2).checkJoinKeyFields(positions);
-               }
-               
-               setBroadcastVariables(builder.broadcastInputs);
-               setGroupOrderForInputOne(builder.secondaryOrder1);
-               setGroupOrderForInputTwo(builder.secondaryOrder2);
-               
-               CoGroupFunction function = ((WrappingCoGroupFunction) 
builder.udf.getUserCodeObject()).getWrappedFunction();
-               
setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new 
UserCodeObjectWrapper<CoGroupFunction>(function)));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public Class<? extends Key<?>>[] getKeyClasses() {
-               return this.keyTypes;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       
-       /**
-        * Builder pattern, straight from Joshua Bloch's Effective Java (2nd 
Edition).
-        */
-       public static class Builder {
-               
-               /* The required parameters */
-               private final 
UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, 
Record, Record>> udf;
-               private final List<Class<? extends Key<?>>> keyClasses;
-               private final List<Integer> keyColumns1;
-               private final List<Integer> keyColumns2;
-               
-               /* The optional parameters */
-               private List<Operator<Record>> inputs1;
-               private List<Operator<Record>> inputs2;
-               private Map<String, Operator<Record>> broadcastInputs;
-               private Ordering secondaryOrder1;
-               private Ordering secondaryOrder2;
-               private String name;
-               
-               /**
-                * Creates a Builder with the provided {@link CoGroupFunction} 
implementation.
-                * 
-                * @param udf The {@link CoGroupFunction} implementation for 
this CoGroup operator.
-                * @param keyClass The class of the key data type.
-                * @param keyColumn1 The position of the key in the first 
input's records.
-                * @param keyColumn2 The position of the key in the second 
input's records.
-                */
-               protected 
Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>> udf, Class<? extends Key<?>> keyClass,
-                               int keyColumn1, int keyColumn2)
-               {
-                       this.udf = udf;
-                       this.keyClasses = new ArrayList<Class<? extends 
Key<?>>>();
-                       this.keyClasses.add(keyClass);
-                       this.keyColumns1 = new ArrayList<Integer>();
-                       this.keyColumns1.add(keyColumn1);
-                       this.keyColumns2 = new ArrayList<Integer>();
-                       this.keyColumns2.add(keyColumn2);
-                       this.inputs1 = new ArrayList<Operator<Record>>();
-                       this.inputs2 = new ArrayList<Operator<Record>>();
-                       this.broadcastInputs = new HashMap<String, 
Operator<Record>>();
-               }
-               
-               /**
-                * Creates a Builder with the provided {@link CoGroupFunction} 
implementation. This method is intended
-                * for special case sub-types only.
-                * 
-                * @param udf The {@link CoGroupFunction} implementation for 
this CoGroup operator.
-                */
-               protected 
Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record,
 Record, Record>> udf) {
-                       this.udf = udf;
-                       this.keyClasses = new ArrayList<Class<? extends 
Key<?>>>();
-                       this.keyColumns1 = new ArrayList<Integer>();
-                       this.keyColumns2 = new ArrayList<Integer>();
-                       this.inputs1 = new ArrayList<Operator<Record>>();
-                       this.inputs2 = new ArrayList<Operator<Record>>();
-                       this.broadcastInputs = new HashMap<String, 
Operator<Record>>();
-               }
-               
-               private int[] getKeyColumnsArray1() {
-                       int[] result = new int[keyColumns1.size()];
-                       for (int i = 0; i < keyColumns1.size(); ++i) {
-                               result[i] = keyColumns1.get(i);
-                       }
-                       return result;
-               }
-               
-               private int[] getKeyColumnsArray2() {
-                       int[] result = new int[keyColumns2.size()];
-                       for (int i = 0; i < keyColumns2.size(); ++i) {
-                               result[i] = keyColumns2.get(i);
-                       }
-                       return result;
-               }
-               
-               @SuppressWarnings("unchecked")
-               private Class<? extends Key<?>>[] getKeyClassesArray() {
-                       return keyClasses.toArray(new Class[keyClasses.size()]);
-               }
-
-               /**
-                * Adds additional key field.
-                * 
-                * @param keyClass The class of the key data type.
-                * @param keyColumn1 The position of the key in the first 
input's records.
-                * @param keyColumn2 The position of the key in the second 
input's records.
-                */
-               public Builder keyField(Class<? extends Key<?>> keyClass, int 
keyColumn1, int keyColumn2) {
-                       keyClasses.add(keyClass);
-                       keyColumns1.add(keyColumn1);
-                       keyColumns2.add(keyColumn2);
-                       return this;
-               }
-               /**
-                * Sets the order of the elements within a group for the first 
input.
-                * 
-                * @param order The order for the elements in a group.
-                */
-               public Builder secondaryOrder1(Ordering order) {
-                       this.secondaryOrder1 = order;
-                       return this;
-               }
-               
-               /**
-                * Sets the order of the elements within a group for the second 
input.
-                * 
-                * @param order The order for the elements in a group.
-                */
-               public Builder secondaryOrder2(Ordering order) {
-                       this.secondaryOrder2 = order;
-                       return this;
-               }
-               
-               /**
-                * Sets the input operator for input 1.
-                * 
-                * @param input The input operator for input 1. 
-                */
-               public Builder input1(Operator<Record> input) {
-                       Preconditions.checkNotNull(input, "The input must not 
be null");
-                       
-                       this.inputs1.clear();
-                       this.inputs1.add(input);
-                       return this;
-               }
-               
-               /**
-                * Sets one or several inputs (union) for input 1.
-                * 
-                * @param inputs
-                */
-               public Builder input1(Operator<Record>...inputs) {
-                       this.inputs1.clear();
-                       for (Operator<Record> c : inputs) {
-                               this.inputs1.add(c);
-                       }
-                       return this;
-               }
-               
-               /**
-                * Sets the input operator for input 2.
-                * 
-                * @param input The input operator for input 2. 
-                */
-               public Builder input2(Operator<Record> input) {
-                       Preconditions.checkNotNull(input, "The input must not 
be null");
-                       
-                       this.inputs2.clear();
-                       this.inputs2.add(input);
-                       return this;
-               }
-               
-               /**
-                * Sets one or several inputs (union) for input 2.
-                * 
-                * @param inputs
-                */
-               public Builder input2(Operator<Record>...inputs) {
-                       this.inputs2.clear();
-                       for (Operator<Record> c : inputs) {
-                               this.inputs2.add(c);
-                       }
-                       return this;
-               }
-               
-               /**
-                * Sets the first inputs.
-                * 
-                * @param inputs
-                */
-               public Builder inputs1(List<Operator<Record>> inputs) {
-                       this.inputs1 = inputs;
-                       return this;
-               }
-               
-               /**
-                * Sets the second inputs.
-                * 
-                * @param inputs
-                */
-               public Builder inputs2(List<Operator<Record>> inputs) {
-                       this.inputs2 = inputs;
-                       return this;
-               }
-               
-               /**
-                * Binds the result produced by a plan rooted at {@code root} 
to a 
-                * variable used by the UDF wrapped in this operator.
-                */
-               public Builder setBroadcastVariable(String name, 
Operator<Record> input) {
-                       this.broadcastInputs.put(name, input);
-                       return this;
-               }
-               
-               /**
-                * Binds multiple broadcast variables.
-                */
-               public Builder setBroadcastVariables(Map<String, 
Operator<Record>> inputs) {
-                       this.broadcastInputs.clear();
-                       this.broadcastInputs.putAll(inputs);
-                       return this;
-               }
-               
-               /**
-                * Sets the name of this operator.
-                * 
-                * @param name
-                */
-               public Builder name(String name) {
-                       this.name = name;
-                       return this;
-               }
-               
-               /**
-                * Creates and returns a CoGroupOperator from using the values 
given 
-                * to the builder.
-                * 
-                * @return The created operator
-                */
-               public CoGroupOperator build() {
-                       if (keyClasses.size() <= 0) {
-                               throw new IllegalStateException("At least one 
key attribute has to be set.");
-                       }
-                       
-                       if (name == null) {
-                               name = udf.getUserCodeClass().getName();
-                       }
-                       return new CoGroupOperator(this);
-               }
-       }
-       
-       // 
============================================================================================
-       
-       public static class WrappingCoGroupFunction extends 
WrappingFunction<CoGroupFunction> 
-                       implements 
org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
-               
-               private static final long serialVersionUID = 1L;
-               
-               public WrappingCoGroupFunction(CoGroupFunction coGrouper) {
-                       super(coGrouper);
-               }
-               
-               @Override
-               public void coGroup(Iterable<Record> records1, Iterable<Record> 
records2, Collector<Record> out) throws Exception {
-                       this.wrappedFunction.coGroup(records1.iterator(), 
records2.iterator(), out);
-               }
-       }
-       
-       public static final class WrappingClassCoGroupFunction extends 
WrappingCoGroupFunction {
-               
-               private static final long serialVersionUID = 1L;
-               
-               public WrappingClassCoGroupFunction(Class<? extends 
CoGroupFunction> reducer) {
-                       super(InstantiationUtil.instantiate(reducer));
-               }
-               
-               private void writeObject(ObjectOutputStream out) throws 
IOException {
-                       out.writeObject(wrappedFunction.getClass());
-               }
-
-               private void readObject(ObjectInputStream in) throws 
IOException, ClassNotFoundException {
-                       Class<?> clazz = (Class<?>) in.readObject();
-                       this.wrappedFunction = (CoGroupFunction) 
InstantiationUtil.instantiate(clazz);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
deleted file mode 100644
index 2b84b73..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.java.record.io.CollectionInputFormat;
-import org.apache.flink.types.Record;
-
-/**
- * 
- *  * <b>NOTE: The Record API is marked as deprecated. It is not being 
developed anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * Operator for input nodes which reads data from collection or iterator.
- * Use this operator if you want to use a collection of data from the 
application 
- * as an input for a job to the cluster.
- * There are two main ways to use the CollectionDataSource:
- * 
- * Using an {@link java.util.Iterator} that is also {@link 
java.io.Serializable}.
- * <pre>
- * CollectionDataSource source = new 
CollectionDataSource(mySerializableIterator, &quot;IterSource&quot;);
- * </pre>
- * 
- * Using a Collection of Java Objects.
- * 
- * <pre>
- * CollectionDataSource source2 = new CollectionDataSource(new 
List&lt;String&gt;(), &quot;Collection source&quot;);
- * </pre>
- * 
- * Note that you can as many elements as you want to the constructor:
- * 
- * <pre>
- * CollectionDataSource(&quot;Varargs String source&quot;, &quot;some&quot;, 
&quot;strings&quot;, &quot;that&quot;, &quot;get&quot;, 
&quot;distributed&quot;);
- * </pre>
- * 
- * The only limitation is that the elements need to have the same type.
- */
-@Deprecated
-public class CollectionDataSource extends GenericDataSourceBase<Record, 
GenericInputFormat<Record>> {
-
-       private static String DEFAULT_NAME = "<Unnamed Collection Data Source>";
-
-       /**
-        * Creates a new instance for the given input using the given input 
format.
-        * 
-        * @param f
-        *        The {@link CollectionInputFormat} implementation used to read 
the data.
-        * @param data
-        *        The input data. It should be a collection, an array or a 
serializable iterator.
-        * @param name
-        *        The given name for the Pact, used in plans, logs and progress 
messages.
-        */
-       public CollectionDataSource(CollectionInputFormat f, String name, 
Object... data) {
-               super(f, OperatorInfoHelper.source(), name);
-               Collection<Object> tmp = new ArrayList<Object>();
-               for (Object o : data) {
-                       tmp.add(o);
-               }
-               checkFormat(tmp);
-               f.setData(tmp);
-       }
-
-       public CollectionDataSource(CollectionInputFormat f, String name, 
Object[][] data) {
-               super(f, OperatorInfoHelper.source(), name);
-               Collection<Object> tmp = new ArrayList<Object>();
-               for (Object o : data) {
-                       tmp.add(o);
-               }
-               checkFormat(tmp);
-               f.setData(tmp);
-       }
-
-       public CollectionDataSource(CollectionInputFormat f, Collection<?> 
data, String name) {
-               super(f, OperatorInfoHelper.source(), name);
-               checkFormat(data);
-               f.setData(data);
-       }
-
-       public <T extends Iterator<?>, Serializable> 
CollectionDataSource(CollectionInputFormat f, T data, String name) {
-               super(f, OperatorInfoHelper.source(), name);
-               f.setIter(data);
-       }
-
-       /**
-        * Creates a new instance for the given input using the given input 
format. The contract has the default name.
-        * The input types will be checked. If the input types don't agree, an 
exception will occur.
-        * 
-        * @param args
-        *        The input data. It should be a collection, an array or a 
serializable iterator.
-        * @param name
-        *        The given name for the Pact, used in plans, logs and progress 
messages.
-        */
-       public CollectionDataSource(String name, Object... args) {
-               this(new CollectionInputFormat(), name, args);
-       }
-
-       public CollectionDataSource(String name, Object[][] args) {
-               this(new CollectionInputFormat(), name, args);
-       }
-
-       public CollectionDataSource(Collection<?> args, String name) {
-               this(new CollectionInputFormat(), args, name);
-       }
-
-       public <T extends Iterator<?>, Serializable> CollectionDataSource(T 
args, String name) {
-               this(new CollectionInputFormat(), args, name);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       /**
-        * for scala compatible, scala-to-java type conversion always has an 
object wrapper
-        */
-       public CollectionDataSource(Object... args) {
-               this(new CollectionInputFormat(), args);
-       }
-
-       @SuppressWarnings("unchecked")
-       public CollectionDataSource(CollectionInputFormat f, Object... data) {
-               super(f, OperatorInfoHelper.source(), DEFAULT_NAME);
-               if (data.length == 1 && data[0] instanceof Iterator) {
-                       f.setIter((Iterator<Object>) data[0]);
-               }
-               else if (data.length == 1 && data[0] instanceof Collection) {
-                       checkFormat((Collection<Object>) data[0]);
-                       f.setData((Collection<Object>) data[0]);
-               }
-               else {
-                       Collection<Object> tmp = new ArrayList<Object>();
-                       for (Object o : data) {
-                               tmp.add(o);
-                       }
-                       checkFormat(tmp);
-                       f.setData(tmp);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       /*
-        * check whether the input field has the same type
-        */
-       private <T> void checkFormat(Collection<T> c) {
-               Class<?> type = null;
-               List<Class<?>> typeList = new ArrayList<Class<?>>();
-               Iterator<T> it = c.iterator();
-               while (it.hasNext()) {
-                       Object o = it.next();
-
-                       // check the input types for 1-dimension
-                       if (type != null && !type.equals(o.getClass())) {
-                               throw new RuntimeException("elements of input 
list should have the same type");
-                       }
-                       else {
-                               type = o.getClass();
-                       }
-
-                       // check the input types for 2-dimension array
-                       if (typeList.size() == 0 && o.getClass().isArray()) {
-                               for (Object s : (Object[]) o) {
-                                       typeList.add(s.getClass());
-                               }
-                       }
-                       else if (o.getClass().isArray()) {
-                               int index = 0;
-                               if (((Object[]) o).length != typeList.size()) {
-                                       throw new RuntimeException("elements of 
input list should have the same size");
-                               }
-                               for (Object s : (Object[]) o) {
-                                       if 
(!s.getClass().equals(typeList.get(index++))) {
-                                               throw new 
RuntimeException("elements of input list should have the same type");
-                                       }
-                               }
-                       }
-
-                       // check the input types for 2-dimension collection
-                       if (typeList.size() == 0 && o instanceof Collection) {
-                               @SuppressWarnings("unchecked")
-                               Iterator<Object> tmpIt = ((Collection<Object>) 
o).iterator();
-                               while (tmpIt.hasNext()) {
-                                       Object s = tmpIt.next();
-                                       typeList.add(s.getClass());
-                               }
-                       }
-                       else if (o instanceof Collection) {
-                               int index = 0;
-                               @SuppressWarnings("unchecked")
-                               Iterator<Object> tmpIt = ((Collection<Object>) 
o).iterator();
-                               while (tmpIt.hasNext()) {
-                                       Object s = tmpIt.next();
-                                       if 
(!s.getClass().equals(typeList.get(index++))) {
-                                               throw new 
RuntimeException("elements of input list should have the same type");
-                                       }
-                               }
-
-                               if (index != typeList.size()) {
-                                       throw new RuntimeException("elements of 
input list should have the same size");
-                               }
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
deleted file mode 100644
index 80e89a5..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * CrossOperator that applies a {@link CrossFunction} to each element of the 
Cartesian Product.
- * 
- * @see CrossFunction
- */
-@Deprecated
-public class CrossOperator extends CrossOperatorBase<Record, Record, Record, 
CrossFunction> implements RecordOperator {
-
-       /**
-        * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-        * 
-        * @param udf The {@link CrossFunction} implementation for this Cross 
operator.
-        */
-       public static Builder builder(CrossFunction udf) {
-               return new Builder(new 
UserCodeObjectWrapper<CrossFunction>(udf));
-       }
-       
-       /**
-        * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-        * 
-        * @param udf The {@link CrossFunction} implementation for this Cross 
operator.
-        */
-       public static Builder builder(Class<? extends CrossFunction> udf) {
-               return new Builder(new 
UserCodeClassWrapper<CrossFunction>(udf));
-       }
-       
-       /**
-        * The private constructor that only gets eIinvoked from the Builder.
-        * @param builder
-        */
-       protected CrossOperator(Builder builder) {
-               super(builder.udf, OperatorInfoHelper.binary(), builder.name);
-               
-               if (builder.inputs1 != null && !builder.inputs1.isEmpty()) {
-                       
setFirstInput(Operator.createUnionCascade(builder.inputs1));
-               }
-               if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
-                       
setSecondInput(Operator.createUnionCascade(builder.inputs2));
-               }
-               
-               setBroadcastVariables(builder.broadcastInputs);
-               
setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
-       }
-       
-
-       @Override
-       public Class<? extends Key<?>>[] getKeyClasses() {
-               return emptyClassArray();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Builder pattern, straight from Joshua Bloch's Effective Java (2nd 
Edition).
-        */
-       public static class Builder {
-               
-               /* The required parameters */
-               private final UserCodeWrapper<CrossFunction> udf;
-               
-               /* The optional parameters */
-               private List<Operator<Record>> inputs1;
-               private List<Operator<Record>> inputs2;
-               private Map<String, Operator<Record>> broadcastInputs;
-               private String name;
-               
-               /**
-                * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-                * 
-                * @param udf The {@link CrossFunction} implementation for this 
Cross operator.
-                */
-               protected Builder(UserCodeWrapper<CrossFunction> udf) {
-                       this.udf = udf;
-                       this.inputs1 = new ArrayList<Operator<Record>>();
-                       this.inputs2 = new ArrayList<Operator<Record>>();
-                       this.broadcastInputs = new HashMap<String, 
Operator<Record>>();
-               }
-               
-               /**
-                * Sets the first input.
-                * 
-                * @param input The first input.
-                */
-               public Builder input1(Operator<Record> input) {
-                       Preconditions.checkNotNull(input, "The input must not 
be null");
-                       
-                       this.inputs1.clear();
-                       this.inputs1.add(input);
-                       return this;
-               }
-               
-               /**
-                * Sets the second input.
-                * 
-                * @param input The second input.
-                */
-               public Builder input2(Operator<Record> input) {
-                       Preconditions.checkNotNull(input, "The input must not 
be null");
-                       
-                       this.inputs2.clear();
-                       this.inputs2.add(input);
-                       return this;
-               }
-               
-               /**
-                * Sets one or several inputs (union) for input 1.
-                * 
-                * @param inputs
-                */
-               public Builder input1(Operator<Record>...inputs) {
-                       this.inputs1.clear();
-                       for (Operator<Record> c : inputs) {
-                               this.inputs1.add(c);
-                       }
-                       return this;
-               }
-               
-               /**
-                * Sets one or several inputs (union) for input 2.
-                * 
-                * @param inputs
-                */
-               public Builder input2(Operator<Record>...inputs) {
-                       this.inputs2.clear();
-                       for (Operator<Record> c : inputs) {
-                               this.inputs2.add(c);
-                       }
-                       return this;
-               }
-               
-               /**
-                * Sets the first inputs.
-                * 
-                * @param inputs
-                */
-               public Builder inputs1(List<Operator<Record>> inputs) {
-                       this.inputs1 = inputs;
-                       return this;
-               }
-               
-               /**
-                * Sets the second inputs.
-                * 
-                * @param inputs
-                */
-               public Builder inputs2(List<Operator<Record>> inputs) {
-                       this.inputs2 = inputs;
-                       return this;
-               }
-               
-               /**
-                * Binds the result produced by a plan rooted at {@code root} 
to a 
-                * variable used by the UDF wrapped in this operator.
-                */
-               public Builder setBroadcastVariable(String name, 
Operator<Record> input) {
-                       this.broadcastInputs.put(name, input);
-                       return this;
-               }
-               
-               /**
-                * Binds multiple broadcast variables.
-                */
-               public Builder setBroadcastVariables(Map<String, 
Operator<Record>> inputs) {
-                       this.broadcastInputs.clear();
-                       this.broadcastInputs.putAll(inputs);
-                       return this;
-               }
-               
-               /**
-                * Sets the name of this operator.
-                * 
-                * @param name
-                */
-               public Builder name(String name) {
-                       this.name = name;
-                       return this;
-               }
-               
-               /**
-                * Creates and returns a CrossOperator from using the values 
given 
-                * to the builder.
-                * 
-                * @return The created operator
-                */
-               public CrossOperator build() {
-                       setNameIfUnset();
-                       return new CrossOperator(this);
-               }
-               
-               protected void setNameIfUnset() {
-                       if (name == null) {
-                               name = udf.getUserCodeClass().getName();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
deleted file mode 100644
index f4a2ad1..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossWithLarge;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-
-
-/**
- * 
- *  <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * This operator represents a Cartesian-Product operation. Of the two inputs, 
the first is expected to be large
- * and the second is expected to be small. 
- * 
- * @see CrossFunction
- */
-
-@Deprecated
-public class CrossWithLargeOperator extends CrossOperator implements 
CrossWithLarge {
-       
-       /**
-        * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-        * 
-        * @param udf The {@link CrossFunction} implementation for this Cross 
operator.
-        */
-       public static Builder builder(CrossFunction udf) {
-               return new Builder(new 
UserCodeObjectWrapper<CrossFunction>(udf));
-       }
-       
-       /**
-        * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-        * 
-        * @param udf The {@link CrossFunction} implementation for this Cross 
operator.
-        */
-       public static Builder builder(Class<? extends CrossFunction> udf) {
-               return new Builder(new 
UserCodeClassWrapper<CrossFunction>(udf));
-       }
-       
-       /**
-        * The private constructor that only gets invoked from the Builder.
-        * @param builder
-        */
-       protected CrossWithLargeOperator(Builder builder) {
-               super(builder);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Builder pattern, straight from Joshua Bloch's Effective Java (2nd 
Edition).
-        */
-       public static class Builder extends CrossOperator.Builder {
-               
-               /**
-                * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-                * 
-                * @param udf The {@link CrossFunction} implementation for this 
Cross operator.
-                */
-               private Builder(UserCodeWrapper<CrossFunction> udf) {
-                       super(udf);
-               }
-               
-               /**
-                * Creates and returns a CrossOperator from using the values 
given 
-                * to the builder.
-                * 
-                * @return The created operator
-                */
-               @Override
-               public CrossWithLargeOperator build() {
-                       setNameIfUnset();
-                       return new CrossWithLargeOperator(this);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
deleted file mode 100644
index 9c9b758..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossWithSmall;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed 
anymore and will be removed from
- * the code at some point.
- * See <a 
href="https://issues.apache.org/jira/browse/FLINK-1106";>FLINK-1106</a> for more 
details.</b>
- * 
- * 
- * This operator represents a Cartesian-Product operation. Of the two inputs, 
the first is expected to be large
- * and the second is expected to be small. 
- * 
- * @see CrossFunction
- */
-@Deprecated
-public class CrossWithSmallOperator extends CrossOperator implements 
CrossWithSmall {
-       
-       /**
-        * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-        * 
-        * @param udf The {@link CrossFunction} implementation for this Cross 
operator.
-        */
-       public static Builder builder(CrossFunction udf) {
-               return new Builder(new 
UserCodeObjectWrapper<CrossFunction>(udf));
-       }
-       
-       /**
-        * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-        * 
-        * @param udf The {@link CrossFunction} implementation for this Cross 
operator.
-        */
-       public static Builder builder(Class<? extends CrossFunction> udf) {
-               return new Builder(new 
UserCodeClassWrapper<CrossFunction>(udf));
-       }
-       
-       /**
-        * The private constructor that only gets invoked from the Builder.
-        * @param builder
-        */
-       protected CrossWithSmallOperator(Builder builder) {
-               super(builder);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Builder pattern, straight from Joshua Bloch's Effective Java (2nd 
Edition).
-        */
-       public static class Builder extends CrossOperator.Builder {
-               
-               /**
-                * Creates a Builder with the provided {@link CrossFunction} 
implementation.
-                * 
-                * @param udf The {@link CrossFunction} implementation for this 
Cross operator.
-                */
-               private Builder(UserCodeWrapper<CrossFunction> udf) {
-                       super(udf);
-               }
-               
-               /**
-                * Creates and returns a CrossOperator from using the values 
given 
-                * to the builder.
-                * 
-                * @return The created operator
-                */
-               @Override
-               public CrossWithSmallOperator build() {
-                       setNameIfUnset();
-                       return new CrossWithSmallOperator(this);
-               }
-       }
-}

Reply via email to