Repository: flink
Updated Branches:
  refs/heads/release-0.8 944e2e3d5 -> cd2f88afd


http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
new file mode 100644
index 0000000..236d149
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.hadoop.mapreduce;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, job);
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 43f8609..089412e 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -25,16 +25,23 @@ import org.apache.flink.api.java.io._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
+import org.apache.flink.api.scala.hadoop.mapred
+import org.apache.flink.api.scala.hadoop.mapreduce
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
 import org.apache.flink.core.fs.Path
 
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv,
-CollectionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, 
CollectionEnvironment}
 import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
 
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.types.StringValue
 import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
MapreduceFileInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job}
+import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat,
+InputFormat => MapredInputFormat, JobConf}
+import org.apache.hadoop.fs.{Path => HadoopPath}
+
 
 import scala.collection.JavaConverters._
 
@@ -269,6 +276,92 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.FileInputFormat]]. The
+   * given inputName is set on the given job.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: JobConf)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val result = createHadoopInput(mapredInputFormat, key, value, job)
+    MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    result
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.FileInputFormat]]. A
+   * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapredFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapred.InputFormat]].
+   */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapredInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: JobConf)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val hadoopInputFormat = new mapred.HadoopInputFormat[K, 
V](mapredInputFormat, key, value, job)
+    createInput(hadoopInputFormat)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
+   * The given inputName is set on the given job.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String,
+      job: Job)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
+    val result = createHadoopInput(mapredInputFormat, key, value, job)
+    MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
+    result
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given
+   * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A
+   * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be 
created.
+   */
+  def readHadoopFile[K, V](
+      mapredInputFormat: MapreduceFileInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      inputPath: String)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
+    readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance)
+  }
+
+  /**
+   * Creates a [[DataSet]] from the given 
[[org.apache.hadoop.mapreduce.InputFormat]].
+   */
+  def createHadoopInput[K, V](
+      mapredInputFormat: MapreduceInputFormat[K, V],
+      key: Class[K],
+      value: Class[V],
+      job: Job)
+      (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
+    val hadoopInputFormat =
+      new mapreduce.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+    createInput(hadoopInputFormat)
+  }
+
+  /**
    * Creates a DataSet from the given non-empty [[Seq]]. The elements need to 
be serializable
    * because the framework may move the elements into the cluster if needed.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
new file mode 100644
index 0000000..5170d14
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
+import org.apache.hadoop.mapred.{JobConf, InputFormat}
+
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: JobConf)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, 
valueClass, job) {
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      return null
+    }
+    fetched = false
+    (key, value)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
new file mode 100644
index 0000000..180a8bf
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: 
JobConf)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
new file mode 100644
index 0000000..cafbdcb
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase
+import org.apache.hadoop.mapreduce.{InputFormat, Job}
+
+class HadoopInputFormat[K, V](
+    mapredInputFormat: InputFormat[K, V],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    job: Job)
+  extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, 
valueClass, job) {
+
+  def nextRecord(reuse: (K, V)): (K, V) = {
+    if (!fetched) {
+      fetchNext()
+    }
+    if (!hasNext) {
+      return null
+    }
+    fetched = false
+    (recordReader.getCurrentKey, recordReader.getCurrentValue)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
new file mode 100644
index 0000000..51db9de
--- /dev/null
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase
+import org.apache.hadoop.mapreduce.{Job, OutputFormat}
+
+class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: 
Job)
+  extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) {
+
+  def writeRecord(record: (K, V)) {
+    this.recordWriter.write(record._1, record._2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 3ee7a26..b94283d 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -116,12 +116,12 @@ under the License.
                        <scope>test</scope>
                </dependency>
                
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-                       <scope>test</scope>
-               </dependency>
+               <!--<dependency>-->
+                       <!--<groupId>com.google.guava</groupId>-->
+                       <!--<artifactId>guava</artifactId>-->
+                       <!--<version>${guava.version}</version>-->
+                       <!--<scope>test</scope>-->
+               <!--</dependency>-->
                
                <dependency>
                        <groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
new file mode 100644
index 0000000..037610e
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapred;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+public class WordCountMapredITCase extends JavaProgramTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       public WordCountMapredITCase(){
+//             setDegreeOfParallelism(4);
+//             setNumTaskManagers(2);
+//             setTaskManagerNumSlots(2);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[] {".", "_"});
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+//             env.setDegreeOfParallelism(1);
+
+
+               DataSet<Tuple2<LongWritable, Text>> input = 
env.readHadoopFile(new TextInputFormat(),
+                               LongWritable.class, Text.class, textPath);
+
+               DataSet<String> text = input.map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
+                       @Override
+                       public String map(Tuple2<LongWritable, Text> value) 
throws Exception {
+                               return value.f1.toString();
+                       }
+               });
+
+
+               DataSet<Tuple2<String, Integer>> counts =
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                                               // group by the tuple field "0" 
and sum up tuple field "1"
+                                               .groupBy(0)
+                                               .sum(1);
+
+               DataSet<Tuple2<Text, LongWritable>> words = counts.map(new 
MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
+
+
+                       @Override
+                       public Tuple2<Text, LongWritable> map(Tuple2<String, 
Integer> value) throws Exception {
+                               return new Tuple2<Text, LongWritable>(new 
Text(value.f0), new LongWritable(value.f1));
+                       }
+               });
+
+               // Set up Hadoop Output Format
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), new JobConf());
+               
hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
+               TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), 
new Path(resultPath));
+
+               // Output & Execute
+               words.output(hadoopOutputFormat);
+               env.execute("Hadoop Compat WordCount");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
new file mode 100644
index 0000000..3bdaa22
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.hadoop.mapreduce;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class WordCountMapreduceITCase extends JavaProgramTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       public WordCountMapreduceITCase(){
+//             setDegreeOfParallelism(4);
+//             setNumTaskManagers(2);
+//             setTaskManagerNumSlots(2);
+       }
+
+       @Override
+       protected void preSubmit() throws Exception {
+               textPath = createTempFile("text.txt", WordCountData.TEXT);
+               resultPath = getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
new String[] {".", "_"});
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+
+               DataSet<Tuple2<LongWritable, Text>> input = 
env.readHadoopFile(new TextInputFormat(),
+                               LongWritable.class, Text.class, textPath);
+
+               DataSet<String> text = input.map(new 
MapFunction<Tuple2<LongWritable, Text>, String>() {
+                       @Override
+                       public String map(Tuple2<LongWritable, Text> value) 
throws Exception {
+                               return value.f1.toString();
+                       }
+               });
+
+
+               DataSet<Tuple2<String, Integer>> counts =
+                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
+                               text.flatMap(new Tokenizer())
+                                               // group by the tuple field "0" 
and sum up tuple field "1"
+                                               .groupBy(0)
+                                               .sum(1);
+
+               DataSet<Tuple2<Text, LongWritable>> words = counts.map(new 
MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() {
+
+
+                       @Override
+                       public Tuple2<Text, LongWritable> map(Tuple2<String, 
Integer> value) throws Exception {
+                               return new Tuple2<Text, LongWritable>(new 
Text(value.f0), new LongWritable(value.f1));
+                       }
+               });
+
+               // Set up Hadoop Output Format
+               Job job = Job.getInstance();
+               HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat =
+                               new HadoopOutputFormat<Text, LongWritable>(new 
TextOutputFormat<Text, LongWritable>(), job);
+               job.getConfiguration().set("mapred.textoutputformat.separator", 
" ");
+               TextOutputFormat.setOutputPath(job, new Path(resultPath));
+
+               // Output & Execute
+               words.output(hadoopOutputFormat);
+               env.execute("Hadoop Compat WordCount");
+       }
+
+       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+
+               @Override
+               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
+                       // normalize and split the line
+                       String[] tokens = value.toLowerCase().split("\\W+");
+
+                       // emit the pairs
+                       for (String token : tokens) {
+                               if (token.length() > 0) {
+                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
new file mode 100644
index 0000000..25c878f
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.hadoop.mapred
+
+import org.apache.flink.api.scala._
+
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, 
TextInputFormat}
+
+class WordCountMapredITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], 
classOf[Text], textPath)
+
+    val text = input map { _._2.toString }
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+
+    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      new JobConf)
+    hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ")
+
+    FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new 
Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
new file mode 100644
index 0000000..4178267
--- /dev/null
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.hadoop.mapreduce
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.testdata.WordCountData
+import org.apache.flink.test.util.JavaProgramTestBase
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{Text, LongWritable}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
TextOutputFormat}
+
+class WordCountMapreduceITCase extends JavaProgramTestBase {
+  protected var textPath: String = null
+  protected var resultPath: String = null
+
+  protected override def preSubmit() {
+    textPath = createTempFile("text.txt", WordCountData.TEXT)
+    resultPath = getTempDirPath("result")
+  }
+
+  protected override def postSubmit() {
+    compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, 
Array[String](".", "_"))
+  }
+
+  protected def testProgram() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input =
+      env.readHadoopFile(new TextInputFormat, classOf[LongWritable], 
classOf[Text], textPath)
+
+    val text = input map { _._2.toString }
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) }
+
+    val job = Job.getInstance()
+    val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable](
+      new TextOutputFormat[Text, LongWritable],
+      job)
+    
hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " 
")
+
+    FileOutputFormat.setOutputPath(job, new Path(resultPath))
+
+    words.output(hadoopOutputFormat)
+
+    env.execute("Hadoop Compat WordCount")
+  }
+}
+

Reply via email to