[FLINK-2900] [hadoop-compat] Remove Record API code from Hadoop Compat module

This closes #1293


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ff04baa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ff04baa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ff04baa

Branch: refs/heads/master
Commit: 2ff04baa649831fe66a7fa1f3d5b4d7adf632261
Parents: a8f1be9
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Oct 22 21:12:15 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Oct 29 10:33:55 2015 +0100

----------------------------------------------------------------------
 .../mapred/record/HadoopDataSink.java           |  98 ----------
 .../mapred/record/HadoopDataSource.java         |  82 --------
 .../mapred/record/HadoopRecordInputFormat.java  | 174 ----------------
 .../mapred/record/HadoopRecordOutputFormat.java | 156 ---------------
 .../datatypes/DefaultFlinkTypeConverter.java    |  95 ---------
 .../datatypes/DefaultHadoopTypeConverter.java   |  83 --------
 .../record/datatypes/FlinkTypeConverter.java    |  43 ----
 .../datatypes/HadoopFileOutputCommitter.java    | 196 -------------------
 .../record/datatypes/HadoopTypeConverter.java   |  42 ----
 .../datatypes/WritableComparableWrapper.java    |  40 ----
 .../record/datatypes/WritableWrapper.java       |  71 -------
 .../datatypes/WritableWrapperConverter.java     |  45 -----
 .../mapred/record/example/WordCount.java        | 184 -----------------
 .../example/WordCountWithOutputFormat.java      | 173 ----------------
 .../record/HadoopRecordInputOutputITCase.java   |  54 -----
 15 files changed, 1536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
deleted file mode 100644
index 3b8064f..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-
-/**
- * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
- *
- * Example usage:
- * <pre>
- *             HadoopDataSink out = new HadoopDataSink(new 
org.apache.hadoop.mapred.TextOutputFormat<Text, IntWritable>(), new JobConf(), 
"Hadoop TextOutputFormat",reducer, Text.class,IntWritable.class);
- *             
org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(out.getJobConf(), new 
Path(output));
- * </pre>
- *
- * Note that it is possible to provide custom data type converter.
- *
- * The HadoopDataSink provides a default converter: {@link 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter}
- **/
-public class HadoopDataSink<K,V> extends GenericDataSink {
-
-       private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";
-
-       private JobConf jobConf;
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, 
String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> 
keyClass, Class<V> valueClass) {
-               this(hadoopFormat, jobConf, name, 
Collections.<Operator<Record>>singletonList(input), conv, keyClass, valueClass);
-       }
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, 
String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-               this(hadoopFormat, jobConf, name, input, new 
DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-       }
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, 
Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-               this(hadoopFormat, jobConf, DEFAULT_NAME, input, new 
DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-       }
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, Operator<Record> 
input, Class<K> keyClass, Class<V> valueClass) {
-               this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new 
DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-       }
-
-
-
-       @SuppressWarnings("deprecation")
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, 
String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, 
Class<K> keyClass, Class<V> valueClass) {
-               super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, 
conv),input, name);
-               
-               if (hadoopFormat == null || jobConf == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.name = name;
-               this.jobConf = jobConf;
-               jobConf.setOutputKeyClass(keyClass);
-               jobConf.setOutputValueClass(valueClass);
-       }
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, 
String name, List<Operator<Record>> input, Class<K> keyClass, Class<V> 
valueClass) {
-               this(hadoopFormat, jobConf, name, input, new 
DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-       }
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, 
List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-               this(hadoopFormat, jobConf, DEFAULT_NAME, input, new 
DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-       }
-
-       public HadoopDataSink(OutputFormat<K,V> hadoopFormat, 
List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-               this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new 
DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-       }
-
-       public JobConf getJobConf() {
-               return this.jobConf;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
deleted file mode 100644
index 508f069..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultHadoopTypeConverter;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-
-/**
- * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
- * 
- * Example usage:
- * <pre>
- *             HadoopDataSource source = new HadoopDataSource(new 
org.apache.hadoop.mapred.TextInputFormat(), new JobConf(), "Input Lines");
- *             
org.apache.hadoop.mapred.TextInputFormat.addInputPath(source.getJobConf(), new 
Path(dataInput));
- * </pre>
- * 
- * Note that it is possible to provide custom data type converter.
- * 
- * The HadoopDataSource provides two different standard converters:
- * * WritableWrapperConverter: Converts Hadoop Types to a record that contains 
a WritableComparableWrapper (key) and a WritableWrapper
- * * DefaultHadoopTypeConverter: Converts the standard hadoop types 
(longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types.
- */
-public class HadoopDataSource<K,V> extends 
GenericDataSource<HadoopRecordInputFormat<K,V>> {
-
-       private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>";
-       
-       private JobConf jobConf;
-       
-       /**
-        * 
-        * @param hadoopFormat Implementation of a Hadoop input format
-        * @param jobConf JobConf object (Hadoop)
-        * @param name Name of the DataSource
-        * @param conv Definition of a custom type converter {@link 
DefaultHadoopTypeConverter}.
-        */
-       public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, 
String name, HadoopTypeConverter<K,V> conv) {
-               super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, 
conv),name);
-               
-               if (hadoopFormat == null || jobConf == null || conv == null) {
-                       throw new NullPointerException();
-               }
-               
-               this.name = name;
-               this.jobConf = jobConf;
-       }
-       
-       public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, 
String name) {
-               this(hadoopFormat, jobConf, name, new 
DefaultHadoopTypeConverter<K,V>() );
-       }
-       public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) 
{
-               this(hadoopFormat, jobConf, DEFAULT_NAME);
-       }
-       
-       public HadoopDataSource(InputFormat<K,V> hadoopFormat) {
-               this(hadoopFormat, new JobConf(), DEFAULT_NAME);
-       }
-
-       public JobConf getJobConf() {
-               return this.jobConf;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
deleted file mode 100644
index edcc43b..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, 
HadoopInputSplit> {
-
-       private static final long serialVersionUID = 1L;
-
-       public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat;
-       public HadoopTypeConverter<K,V> converter;
-       private String hadoopInputFormatName;
-       public JobConf jobConf;
-       public transient K key;
-       public transient V value;
-       public RecordReader<K, V> recordReader;
-       private boolean fetched = false;
-       private boolean hasNext;
-               
-       public HadoopRecordInputFormat() {
-               super();
-       }
-       
-       public 
HadoopRecordInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> 
hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) {
-               super();
-               this.hadoopInputFormat = hadoopInputFormat;
-               this.hadoopInputFormatName = 
hadoopInputFormat.getClass().getName();
-               this.converter = conv;
-               HadoopUtils.mergeHadoopConf(job);
-               this.jobConf = job;
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-               
-       }
-
-       @Override
-       public BaseStatistics getStatistics(BaseStatistics cachedStatistics) 
throws IOException {
-               return null;
-       }
-
-       @Override
-       public HadoopInputSplit[] createInputSplits(int minNumSplits)
-                       throws IOException {
-               org.apache.hadoop.mapred.InputSplit[] splitArray = 
hadoopInputFormat.getSplits(jobConf, minNumSplits);
-               HadoopInputSplit[] hiSplit = new 
HadoopInputSplit[splitArray.length];
-               for(int i=0;i<splitArray.length;i++){
-                       hiSplit[i] = new HadoopInputSplit(i, splitArray[i], 
jobConf);
-               }
-               return hiSplit;
-       }
-
-       @Override
-       public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new DefaultInputSplitAssigner(inputSplits);
-       }
-
-       @Override
-       public void open(HadoopInputSplit split) throws IOException {
-               this.recordReader = 
this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, 
new HadoopDummyReporter());
-               key = this.recordReader.createKey();
-               value = this.recordReader.createValue();
-               this.fetched = false;
-       }
-
-       private void fetchNext() throws IOException {
-               hasNext = this.recordReader.next(key, value);
-               fetched = true;
-       }
-       
-       @Override
-       public boolean reachedEnd() throws IOException {
-               if(!fetched) {
-                       fetchNext();
-               }
-               return !hasNext;
-       }
-
-       @Override
-       public Record nextRecord(Record record) throws IOException {
-               if(!fetched) {
-                       fetchNext();
-               }
-               if(!hasNext) {
-                       return null;
-               }
-               converter.convert(record, key, value);
-               fetched = false;
-               return record;
-       }
-
-       @Override
-       public void close() throws IOException {
-               this.recordReader.close();
-       }
-       
-       /**
-        * Custom serialization methods.
-        *  @see 
"http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";
-        */
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeUTF(hadoopInputFormatName);
-               jobConf.write(out);
-               out.writeObject(converter);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               hadoopInputFormatName = in.readUTF();
-               if(jobConf == null) {
-                       jobConf = new JobConf();
-               }
-               jobConf.readFields(in);
-               try {
-                       this.hadoopInputFormat = 
(org.apache.hadoop.mapred.InputFormat<K,V>) 
Class.forName(this.hadoopInputFormatName).newInstance();
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to instantiate the 
hadoop input format", e);
-               }
-               ReflectionUtils.setConf(hadoopInputFormat, jobConf);
-               converter = (HadoopTypeConverter<K,V>) in.readObject();
-       }
-       
-       public void setJobConf(JobConf job) {
-               this.jobConf = job;
-       }
-               
-
-       public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() 
{
-               return hadoopInputFormat;
-       }
-       
-       public void 
setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> 
hadoopInputFormat) {
-               this.hadoopInputFormat = hadoopInputFormat;
-       }
-       
-       public JobConf getJobConf() {
-               return jobConf;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
deleted file mode 100644
index e519062..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
-import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-public class HadoopRecordOutputFormat<K,V> implements OutputFormat<Record> {
-
-       private static final long serialVersionUID = 1L;
-
-       public JobConf jobConf;
-
-       public org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat;
-
-       private String hadoopOutputFormatName;
-
-       public RecordWriter<K,V> recordWriter;
-
-       public FlinkTypeConverter<K,V> converter;
-
-       public HadoopFileOutputCommitter fileOutputCommitterWrapper;
-
-       public 
HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> 
hadoopFormat, JobConf job, FlinkTypeConverter<K,V> conv) {
-               super();
-               this.hadoopOutputFormat = hadoopFormat;
-               this.hadoopOutputFormatName = hadoopFormat.getClass().getName();
-               this.converter = conv;
-               this.fileOutputCommitterWrapper = new 
HadoopFileOutputCommitter();
-               HadoopUtils.mergeHadoopConf(job);
-               this.jobConf = job;
-       }
-
-       @Override
-       public void configure(Configuration parameters) {
-       }
-
-       /**
-        * create the temporary output file for hadoop RecordWriter.
-        * @param taskNumber The number of the parallel instance.
-        * @param numTasks The number of parallel tasks.
-        * @throws IOException
-        */
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               this.fileOutputCommitterWrapper.setupJob(this.jobConf);
-               if (Integer.toString(taskNumber + 1).length() <= 6) {
-                       this.jobConf.set("mapred.task.id", "attempt__0000_r_" + 
String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," 
").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0");
-                       //compatible for hadoop 2.2.0, the temporary output 
directory is different from hadoop 1.2.1
-                       this.jobConf.set("mapreduce.task.output.dir", 
this.fileOutputCommitterWrapper.getTempTaskOutputPath(this.jobConf,TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))).toString());
-               } else {
-                       throw new IOException("task id too large");
-               }
-               this.recordWriter = 
this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, 
Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
-       }
-
-
-       @Override
-       public void writeRecord(Record record) throws IOException {
-               K key = this.converter.convertKey(record);
-               V value = this.converter.convertValue(record);
-               this.recordWriter.write(key, value);
-       }
-
-       /**
-        * commit the task by moving the output file out from the temporary 
directory.
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               this.recordWriter.close(new HadoopDummyReporter());
-               if 
(this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, 
TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) {
-                       
this.fileOutputCommitterWrapper.commitTask(this.jobConf, 
TaskAttemptID.forName(this.jobConf.get("mapred.task.id")));
-               }
-       //TODO: commitjob when all the tasks are finished
-       }
-
-
-       /**
-        * Custom serialization methods.
-        *  @see 
"http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";
-        */
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.writeUTF(hadoopOutputFormatName);
-               jobConf.write(out);
-               out.writeObject(converter);
-               out.writeObject(fileOutputCommitterWrapper);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               hadoopOutputFormatName = in.readUTF();
-               if(jobConf == null) {
-                       jobConf = new JobConf();
-               }
-               jobConf.readFields(in);
-               try {
-                       this.hadoopOutputFormat = 
(org.apache.hadoop.mapred.OutputFormat<K,V>) 
Class.forName(this.hadoopOutputFormatName).newInstance();
-               } catch (Exception e) {
-                       throw new RuntimeException("Unable to instantiate the 
hadoop output format", e);
-               }
-               ReflectionUtils.setConf(hadoopOutputFormat, jobConf);
-               converter = (FlinkTypeConverter<K,V>) in.readObject();
-               fileOutputCommitterWrapper = (HadoopFileOutputCommitter) 
in.readObject();
-       }
-
-
-       public void setJobConf(JobConf job) {
-               this.jobConf = job;
-       }
-
-       public JobConf getJobConf() {
-               return jobConf;
-       }
-
-       public org.apache.hadoop.mapred.OutputFormat<K,V> 
getHadoopOutputFormat() {
-               return hadoopOutputFormat;
-       }
-
-       public void 
setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> 
hadoopOutputFormat) {
-               this.hadoopOutputFormat = hadoopOutputFormat;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
deleted file mode 100644
index 9d37988..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-/**
- * Convert Flink Record into the default hadoop writables.
- */
-public class DefaultFlinkTypeConverter<K,V> implements FlinkTypeConverter<K,V> 
{
-       private static final long serialVersionUID = 1L;
-
-       private Class<K> keyClass;
-       private Class<V> valueClass;
-
-       public DefaultFlinkTypeConverter(Class<K> keyClass, Class<V> 
valueClass) {
-               this.keyClass= keyClass;
-               this.valueClass = valueClass;
-       }
-       @Override
-       public K convertKey(Record flinkRecord) {
-               if(flinkRecord.getNumFields() > 0) {
-                       return convert(flinkRecord, 0, this.keyClass);
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       public V convertValue(Record flinkRecord) {
-               if(flinkRecord.getNumFields() > 1) {
-                       return convert(flinkRecord, 1, this.valueClass);
-               } else {
-                       return null;
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) {
-               if(hadoopType == LongWritable.class ) {
-                       return (T) new LongWritable((flinkType.getField(pos, 
LongValue.class)).getValue());
-               }
-               if(hadoopType == org.apache.hadoop.io.Text.class) {
-                       return (T) new Text((flinkType.getField(pos, 
StringValue.class)).getValue());
-               }
-               if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
-                       return (T) new IntWritable((flinkType.getField(pos, 
IntValue.class)).getValue());
-               }
-               if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
-                       return (T) new FloatWritable((flinkType.getField(pos, 
FloatValue.class)).getValue());
-               }
-               if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
-                       return (T) new DoubleWritable((flinkType.getField(pos, 
DoubleValue.class)).getValue());
-               }
-               if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
-                       return (T) new BooleanWritable((flinkType.getField(pos, 
BooleanValue.class)).getValue());
-               }
-               if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
-                       return (T) new ByteWritable((flinkType.getField(pos, 
ByteValue.class)).getValue());
-               }
-
-               throw new RuntimeException("Unable to convert Flink type 
("+flinkType.getClass().getCanonicalName()+") to Hadoop.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
deleted file mode 100644
index 6ed670a..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-
-
-/**
- * Converter for the default hadoop writables.
- * Key will be in field 0, Value in field 1 of a Record.
- */
-public class DefaultHadoopTypeConverter<K, V> implements 
HadoopTypeConverter<K, V> {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
-               flinkRecord.setField(0, convert(hadoopKey));
-               flinkRecord.setField(1, convert(hadoopValue));
-       }
-       
-       protected Value convert(Object hadoopType) {
-               if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) {
-                       return new LongValue(((LongWritable)hadoopType).get());
-               }
-               if(hadoopType instanceof org.apache.hadoop.io.Text) {
-                       return new StringValue(((Text)hadoopType).toString());
-               }
-               if(hadoopType instanceof org.apache.hadoop.io.IntWritable) {
-                       return new IntValue(((IntWritable)hadoopType).get());
-               }
-               if(hadoopType instanceof org.apache.hadoop.io.FloatWritable) {
-                       return new 
FloatValue(((FloatWritable)hadoopType).get());
-               }
-               if(hadoopType instanceof org.apache.hadoop.io.DoubleWritable) {
-                       return new 
DoubleValue(((DoubleWritable)hadoopType).get());
-               }
-               if(hadoopType instanceof org.apache.hadoop.io.BooleanWritable) {
-                       return new 
BooleanValue(((BooleanWritable)hadoopType).get());
-               }
-               if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) {
-                       return new ByteValue(((ByteWritable)hadoopType).get());
-               }
-               if (hadoopType instanceof NullWritable) {
-                       return NullValue.getInstance();
-               }
-               
-               throw new RuntimeException("Unable to convert Hadoop type 
("+hadoopType.getClass().getCanonicalName()+") to a Flink data type.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
deleted file mode 100644
index 3c14a86..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.Serializable;
-
-import org.apache.flink.types.Record;
-
-/**
- * An interface describing a class that is able to
- * convert Flink's Record into Hadoop types model.
- *
- * The converter must be Serializable.
- *
- * Flink provides a DefaultFlinkTypeConverter. Custom implementations should
- * chain the type converters.
- */
-public interface FlinkTypeConverter<K,V> extends Serializable {
-
-       /**
-        * Convert a Flink type to a Hadoop type.
-        */
-       public K convertKey(Record record);
-
-       public V convertValue(Record record);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
deleted file mode 100644
index ce4955c..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes 
{@link org.apache.hadoop.mapred.JobContext}
- * as input parameter. However JobContext class is package private, and in 
Hadoop 2.2.0 it's public.
- * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead 
of JobContext in order to setup and commit tasks.
- */
-public class HadoopFileOutputCommitter extends FileOutputCommitter implements 
Serializable {
-
-       private static final long serialVersionUID = 1L;
-       
-       static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
-               "mapreduce.fileoutputcommitter.marksuccessfuljobs";
-
-       public void setupJob(JobConf conf) throws IOException {
-               Path outputPath = FileOutputFormat.getOutputPath(conf);
-               if (outputPath != null) {
-                       Path tmpDir = new Path(outputPath, 
FileOutputCommitter.TEMP_DIR_NAME);
-                       FileSystem fileSys = tmpDir.getFileSystem(conf);
-                       if (!fileSys.mkdirs(tmpDir)) {
-                               LOG.error("Mkdirs failed to create " + 
tmpDir.toString());
-                       }
-               }
-       }
-
-       private static boolean getOutputDirMarking(JobConf conf) {
-               return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,  true);
-       }
-
-       private void markSuccessfulOutputDir(JobConf conf)
-               throws IOException {
-               Path outputPath = FileOutputFormat.getOutputPath(conf);
-               if (outputPath != null) {
-                       FileSystem fileSys = outputPath.getFileSystem(conf);
-                       // create a file in the folder to mark it
-                       if (fileSys.exists(outputPath)) {
-                               Path filePath = new Path(outputPath, 
SUCCEEDED_FILE_NAME);
-                               fileSys.create(filePath).close();
-                       }
-               }
-       }
-
-       private Path getFinalPath(Path jobOutputDir, Path taskOutput,
-                                                       Path taskOutputPath) 
throws IOException {
-               URI taskOutputUri = taskOutput.toUri();
-               URI relativePath = 
taskOutputPath.toUri().relativize(taskOutputUri);
-               if (taskOutputUri == relativePath) {//taskOutputPath is not a 
parent of taskOutput
-                       throw new IOException("Can not get the relative path: 
base = " +
-                               taskOutputPath + " child = " + taskOutput);
-               }
-               if (relativePath.getPath().length() > 0) {
-                       return new Path(jobOutputDir, relativePath.getPath());
-               } else {
-                       return jobOutputDir;
-               }
-       }
-       private void moveTaskOutputs(JobConf conf, TaskAttemptID taskAttemptID,
-                                                               FileSystem fs,
-                                                               Path 
jobOutputDir,
-                                                               Path taskOutput)
-               throws IOException {
-               if (fs.isFile(taskOutput)) {
-                       Path finalOutputPath = getFinalPath(jobOutputDir, 
taskOutput,
-                               getTempTaskOutputPath(conf, taskAttemptID));
-                       if (!fs.rename(taskOutput, finalOutputPath)) {
-                               if (!fs.delete(finalOutputPath, true)) {
-                                       throw new IOException("Failed to delete 
earlier output of task: " +
-                                               taskAttemptID);
-                               }
-                               if (!fs.rename(taskOutput, finalOutputPath)) {
-                                       throw new IOException("Failed to save 
output of task: " +
-                                               taskAttemptID);
-                               }
-                       }
-                       LOG.debug("Moved " + taskOutput + " to " + 
finalOutputPath);
-               } else if(fs.getFileStatus(taskOutput).isDir()) {
-                       FileStatus[] paths = fs.listStatus(taskOutput);
-                       Path finalOutputPath = getFinalPath(jobOutputDir, 
taskOutput,
-                               getTempTaskOutputPath(conf, taskAttemptID));
-                       fs.mkdirs(finalOutputPath);
-                       if (paths != null) {
-                               for (FileStatus path : paths) {
-                                       moveTaskOutputs(conf,taskAttemptID, fs, 
jobOutputDir, path.getPath());
-                               }
-                       }
-               }
-       }
-
-       public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
-               throws IOException {
-               Path taskOutputPath = getTempTaskOutputPath(conf, 
taskAttemptID);
-               if (taskOutputPath != null) {
-                       FileSystem fs = taskOutputPath.getFileSystem(conf);
-                       if (fs.exists(taskOutputPath)) {
-                               Path jobOutputPath = 
taskOutputPath.getParent().getParent();
-                               // Move the task outputs to their final place
-                               moveTaskOutputs(conf,taskAttemptID, fs, 
jobOutputPath, taskOutputPath);
-                               // Delete the temporary task-specific output 
directory
-                               if (!fs.delete(taskOutputPath, true)) {
-                                       LOG.info("Failed to delete the 
temporary output" +
-                                               " directory of task: " + 
taskAttemptID + " - " + taskOutputPath);
-                               }
-                               LOG.info("Saved output of task '" + 
taskAttemptID + "' to " +
-                                       jobOutputPath);
-                       }
-               }
-       }
-       public boolean needsTaskCommit(JobConf conf, TaskAttemptID 
taskAttemptID)
-               throws IOException {
-               try {
-                       Path taskOutputPath = getTempTaskOutputPath(conf, 
taskAttemptID);
-                       if (taskOutputPath != null) {
-                               // Get the file-system for the task output 
directory
-                               FileSystem fs = 
taskOutputPath.getFileSystem(conf);
-                               // since task output path is created on demand,
-                               // if it exists, task needs a commit
-                               if (fs.exists(taskOutputPath)) {
-                                       return true;
-                               }
-                       }
-               } catch (IOException  ioe) {
-                       throw ioe;
-               }
-               return false;
-       }
-
-       public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID 
taskAttemptID) {
-               Path outputPath = FileOutputFormat.getOutputPath(conf);
-               if (outputPath != null) {
-                       Path p = new Path(outputPath,
-                               (FileOutputCommitter.TEMP_DIR_NAME + 
Path.SEPARATOR +
-                                       "_" + taskAttemptID.toString()));
-                       try {
-                               FileSystem fs = p.getFileSystem(conf);
-                               return p.makeQualified(fs);
-                       } catch (IOException ie) {
-                               LOG.warn(StringUtils.stringifyException(ie));
-                               return p;
-                       }
-               }
-               return null;
-       }
-       public void cleanupJob(JobConf conf) throws IOException {
-               // do the clean up of temporary directory
-               Path outputPath = FileOutputFormat.getOutputPath(conf);
-               if (outputPath != null) {
-                       Path tmpDir = new Path(outputPath, 
FileOutputCommitter.TEMP_DIR_NAME);
-                       FileSystem fileSys = tmpDir.getFileSystem(conf);
-                       if (fileSys.exists(tmpDir)) {
-                               fileSys.delete(tmpDir, true);
-                       }
-               } else {
-                       LOG.warn("Output path is null in cleanup");
-               }
-       }
-
-       public void commitJob(JobConf conf) throws IOException {
-               cleanupJob(conf);
-               if (getOutputDirMarking(conf)) {
-                       markSuccessfulOutputDir(conf);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
deleted file mode 100644
index 6bbf077..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.Serializable;
-
-import org.apache.flink.types.Record;
-
-
-/**
- * An interface describing a class that is able to 
- * convert Hadoop types into Flink's Record model.
- * 
- * The converter must be Serializable.
- * 
- * Flink provides a DefaultHadoopTypeConverter. Custom implementations should
- * chain the type converters.
- */
-public interface HadoopTypeConverter<K, V> extends Serializable {
-       
-       /**
-        * Convert a Hadoop type to a Flink type.
-        */
-       public void convert(Record record, K hadoopKey, V hadoopValue);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
deleted file mode 100644
index 0519ac9..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.Key;
-import org.apache.hadoop.io.WritableComparable;
-
-public class WritableComparableWrapper<T extends WritableComparable<T>> 
extends WritableWrapper<T> implements Key<WritableComparableWrapper<T>> {
-       private static final long serialVersionUID = 1L;
-       
-       public WritableComparableWrapper() {
-               super();
-       }
-       
-       public WritableComparableWrapper(T toWrap) {
-               super(toWrap);
-       }
-
-       @Override
-       public int compareTo(WritableComparableWrapper<T> o) {
-               return super.value().compareTo(o.value());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
deleted file mode 100644
index 6369bb7..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-
-public class WritableWrapper<T extends Writable> implements Value {
-       private static final long serialVersionUID = 2L;
-       
-       private T wrapped;
-       private String wrappedType;
-       private ClassLoader cl;
-       
-       public WritableWrapper() {
-       }
-       
-       public WritableWrapper(T toWrap) {
-               wrapped = toWrap;
-               wrappedType = toWrap.getClass().getCanonicalName();
-       }
-
-       public T value() {
-               return wrapped;
-       }
-       
-       @Override
-       public void write(DataOutputView out) throws IOException {
-               out.writeUTF(wrappedType);
-               wrapped.write(out);
-       }
-
-       @Override
-       public void read(DataInputView in) throws IOException {
-               if(cl == null) {
-                       cl = Thread.currentThread().getContextClassLoader();
-               }
-               wrappedType = in.readUTF();
-               try {
-                       @SuppressWarnings("unchecked")
-                       Class<T> wrClass = (Class<T>) 
Class.forName(wrappedType, true, cl).asSubclass(Writable.class);
-                       wrapped = InstantiationUtil.instantiate(wrClass, 
Writable.class);
-               } catch (ClassNotFoundException e) {
-                       throw new RuntimeException("Error creating the 
WritableWrapper", e);
-               }
-               wrapped.readFields(in);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
deleted file mode 100644
index 6bb13d1..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-@SuppressWarnings("rawtypes")
-public class WritableWrapperConverter<K extends WritableComparable, V extends 
Writable> implements HadoopTypeConverter<K,V> {
-       private static final long serialVersionUID = 1L;
-
-       @Override
-       public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
-               flinkRecord.setField(0, convertKey(hadoopKey));
-               flinkRecord.setField(1, convertValue(hadoopValue));
-       }
-       
-       @SuppressWarnings("unchecked")
-       private final Value convertKey(K in) {
-               return new WritableComparableWrapper(in);
-       }
-       
-       private final Value convertValue(V in) {
-               return new WritableWrapper<V>(in);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
deleted file mode 100644
index b167080..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.record.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file. 
- * 
- * <br /><br />
- * 
- * <b>Note</b>: This example uses the out-dated Record API.
- * It is recommended to use the new Java API.
- * 
- * @see 
org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount
- */
-public class WordCount implements Program, ProgramDescription {
-       
-       private static final long serialVersionUID = 1L;
-
-
-       /**
-        * Converts a Record containing one string in to multiple 
string/integer pairs.
-        * The string is tokenized by whitespaces. For each token a new record 
is emitted,
-        * where the token is the first field and an Integer(1) is the second 
field.
-        */
-       public static class TokenizeLine extends MapFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void map(Record record, Collector<Record> collector) {
-                       // get the first field (as type StringValue) from the 
record
-                       String line = record.getField(1, 
StringValue.class).getValue();
-                       // normalize the line
-                       line = line.replaceAll("\\W+", " ").toLowerCase();
-                       
-                       // tokenize the line
-                       StringTokenizer tokenizer = new StringTokenizer(line);
-                       while (tokenizer.hasMoreTokens()) {
-                               String word = tokenizer.nextToken();
-                               
-                               // we emit a (word, 1) pair 
-                               collector.collect(new Record(new 
StringValue(word), new IntValue(1)));
-                       }
-               }
-       }
-
-       /**
-        * Sums up the counts for a certain given key. The counts are assumed 
to be at position <code>1</code>
-        * in the record. The other fields are not modified.
-        */
-       @Combinable
-       @ConstantFields(0)
-       public static class CountWords extends ReduceFunction implements 
Serializable {
-               
-               private static final long serialVersionUID = 1L;
-               
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       Record element = null;
-                       int sum = 0;
-                       
-                       while (records.hasNext()) {
-                               element = records.next();
-                               int cnt = element.getField(1, 
IntValue.class).getValue();
-                               sum += cnt;
-                       }
-
-                       element.setField(1, new IntValue(sum));
-                       out.collect(element);
-               }
-               
-               @Override
-               public void combine(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       // the logic is the same as in the reduce function, so 
simply call the reduce method
-                       reduce(records, out);
-               }
-       }
-
-
-       @SuppressWarnings({ "rawtypes", "unchecked", "unused" })
-       @Override
-       public Plan getPlan(String... args) {
-               // parse job parameters
-               int numSubTasks   = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               String dataInput = (args.length > 1 ? args[1] : "");
-               String output    = (args.length > 2 ? args[2] : "");
-               
-               
-               HadoopDataSource source = new HadoopDataSource(new 
TextInputFormat(), new JobConf(), "Input Lines");
-               TextInputFormat.addInputPath(source.getJobConf(), new 
Path(dataInput));
-               
-               // Example with Wrapper Converter
-               HadoopDataSource<LongWritable,Text> sourceHadoopType = new 
HadoopDataSource<LongWritable, Text>(
-                               new TextInputFormat(), new JobConf(), "Input 
Lines", new WritableWrapperConverter<LongWritable, Text>());
-               TextInputFormat.addInputPath(source.getJobConf(), new 
Path(dataInput));
-               
-               MapOperator mapper = MapOperator.builder(new TokenizeLine())
-                       .input(source)
-                       .name("Tokenize Lines")
-                       .build();
-               ReduceOperator reducer = 
ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-                       .input(mapper)
-                       .name("Count Words")
-                       .build();
-               FileDataSink out = new FileDataSink(new CsvOutputFormat(), 
output, reducer, "Word Counts");
-               CsvOutputFormat.configureRecordFormat(out)
-                       .recordDelimiter('\n')
-                       .fieldDelimiter(' ')
-                       .field(StringValue.class, 0)
-                       .field(IntValue.class, 1);
-               
-               Plan plan = new Plan(out, "WordCount Example");
-               plan.setDefaultParallelism(numSubTasks);
-               return plan;
-       }
-
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [numSubStasks] [input] [output]";
-       }
-
-       
-       public static void main(String[] args) throws Exception {
-               WordCount wc = new WordCount();
-               
-               if (args.length < 3) {
-                       System.err.println(wc.getDescription());
-                       System.exit(1);
-               }
-               
-               Plan plan = wc.getPlan(args);
-               
-               // This will execute the word-count embedded in a local 
context. replace this line by the commented
-               // succeeding line to send the job to a local installation or 
to a cluster for execution
-               LocalExecutor.execute(plan);
-//             PlanExecutor ex = new RemoteExecutor("localhost", 6123, 
"target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-//             ex.executePlan(plan);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
deleted file mode 100644
index a838215..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.record.example;
-
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import 
org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.client.LocalExecutor;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSink;
-import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * the occurrences of each word in the file.
- * 
- * <br /><br />
- * 
- * <b>Note</b>: This example uses the out dated Record API.
- * It is recommended to use the new Java API.
- * 
- * @see WordCount
- */
-@SuppressWarnings("serial")
-public class WordCountWithOutputFormat implements Program, ProgramDescription {
-
-       /**
-        * Converts a Record containing one string in to multiple 
string/integer pairs.
-        * The string is tokenized by whitespaces. For each token a new record 
is emitted,
-        * where the token is the first field and an Integer(1) is the second 
field.
-        */
-       public static class TokenizeLine extends MapFunction implements 
Serializable {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void map(Record record, Collector<Record> collector) {
-                       // get the first field (as type StringValue) from the 
record
-                       String line = record.getField(1, 
StringValue.class).getValue();
-                       // normalize the line
-                       line = line.replaceAll("\\W+", " ").toLowerCase();
-
-                       // tokenize the line
-                       StringTokenizer tokenizer = new StringTokenizer(line);
-                       while (tokenizer.hasMoreTokens()) {
-                               String word = tokenizer.nextToken();
-
-                               // we emit a (word, 1) pair 
-                               collector.collect(new Record(new 
StringValue(word), new IntValue(1)));
-                       }
-               }
-       }
-
-       /**
-        * Sums up the counts for a certain given key. The counts are assumed 
to be at position <code>1</code>
-        * in the record. The other fields are not modified.
-        */
-       @Combinable
-       @ConstantFields(0)
-       public static class CountWords extends ReduceFunction implements 
Serializable {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void reduce(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       Record element = null;
-                       int sum = 0;
-                       
-                       while (records.hasNext()) {
-                               element = records.next();
-                               int cnt = element.getField(1, 
IntValue.class).getValue();
-                               sum += cnt;
-                       }
-
-                       element.setField(1, new IntValue(sum));
-                       out.collect(element);
-               }
-
-               @Override
-               public void combine(Iterator<Record> records, Collector<Record> 
out) throws Exception {
-                       // the logic is the same as in the reduce function, so 
simply call the reduce method
-                       reduce(records, out);
-               }
-       }
-
-
-       @Override
-       public Plan getPlan(String... args) {
-               // parse job parameters
-               int numSubTasks   = (args.length > 0 ? 
Integer.parseInt(args[0]) : 1);
-               String dataInput = (args.length > 1 ? args[1] : "");
-               String output    = (args.length > 2 ? args[2] : "");
-
-               HadoopDataSource<LongWritable, Text> source = new 
HadoopDataSource<LongWritable, Text>(
-                               new TextInputFormat(), new JobConf(), "Input 
Lines");
-               TextInputFormat.addInputPath(source.getJobConf(), new 
Path(dataInput));
-
-
-               MapOperator mapper = MapOperator.builder(new TokenizeLine())
-                               .input(source)
-                               .name("Tokenize Lines")
-                               .build();
-               ReduceOperator reducer = 
ReduceOperator.builder(CountWords.class, StringValue.class, 0)
-                               .input(mapper)
-                               .name("Count Words")
-                               .build();
-               HadoopDataSink<Text, IntWritable> out = new 
HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new 
JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class);
-               TextOutputFormat.setOutputPath(out.getJobConf(), new 
Path(output));
-
-               Plan plan = new Plan(out, "Hadoop OutputFormat Example");
-               plan.setDefaultParallelism(numSubTasks);
-               return plan;
-       }
-
-
-       @Override
-       public String getDescription() {
-               return "Parameters: [numSubStasks] [input] [output]";
-       }
-
-
-       public static void main(String[] args) throws Exception {
-               WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
-
-               if (args.length < 3) {
-                       System.err.println(wc.getDescription());
-                       System.exit(1);
-               }
-
-               Plan plan = wc.getPlan(args);
-
-               // This will execute the word-count embedded in a local 
context. replace this line by the commented
-               // succeeding line to send the job to a local installation or 
to a cluster for execution
-               LocalExecutor.execute(plan);
-//             PlanExecutor ex = new RemoteExecutor("localhost", 6123, 
"target/pact-examples-0.4-SNAPSHOT-WordCount.jar");
-//             ex.executePlan(plan);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
 
b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
deleted file mode 100644
index fe7ea8e..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.hadoopcompatibility.mapred.record;
-
-import org.apache.flink.api.common.Plan;
-import 
org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-/**
- * test the hadoop inputformat and outputformat
- */
-public class HadoopRecordInputOutputITCase extends RecordAPITestBase {
-       protected String textPath;
-       protected String resultPath;
-       protected String counts;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               textPath = createTempFile("text.txt", WordCountData.TEXT);
-               resultPath = getTempDirPath("result");
-               counts = WordCountData.COUNTS.replaceAll(" ", "\t");
-       }
-
-       @Override
-       protected Plan getTestJob() {
-               //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat 
as input and output file in hadoop TextOutputFormat
-               WordCountWithOutputFormat wc = new WordCountWithOutputFormat();
-               return wc.getPlan("1", textPath, resultPath);
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               // Test results, append /1 to resultPath due to the generated 
_temproray file.
-               compareResultsByLinesInMemory(counts, resultPath + "/1");
-       }
-}

Reply via email to