Repository: flink Updated Branches: refs/heads/master 7bc78cbf9 -> 8b3805ba5
http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java index 2b99fd2..f5758eb 100644 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java +++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java @@ -32,8 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; -import org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; /** * Implements a word count which takes the input file and counts the number of http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java deleted file mode 100644 index 86b730f..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/utils/HadoopUtils.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapreduce.utils; - -import java.lang.reflect.Constructor; -import java.util.Map; - -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; - -public class HadoopUtils { - - /** - * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration. - */ - public static void mergeHadoopConf(Configuration configuration) { - Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - - for (Map.Entry<String, String> e : hadoopConf) { - configuration.set(e.getKey(), e.getValue()); - } - } - - public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { - try { - Class<?> clazz; - // for Hadoop 1.xx - if(JobContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); - } - Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class); - JobContext context = (JobContext) constructor.newInstance(configuration, jobId); - - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of JobContext."); - } - } - - public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { - try { - Class<?> clazz; - // for Hadoop 1.xx - if(JobContext.class.isInterface()) { - clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); - } - // for Hadoop 2.xx - else { - clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); - } - Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); - TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); - - return context; - } catch(Exception e) { - throw new Exception("Could not create instance of TaskAttemptContext."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java deleted file mode 100644 index 7477c28..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapreduce.wrapper; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.core.io.LocatableInputSplit; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.mapreduce.JobContext; - - -public class HadoopInputSplit extends LocatableInputSplit { - - public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit; - public transient JobContext jobContext; - - private int splitNumber; - - public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() { - return mapreduceInputSplit; - } - - - public HadoopInputSplit() { - super(); - } - - - public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { - this.splitNumber = splitNumber; - if(!(mapreduceInputSplit instanceof Writable)) { - throw new IllegalArgumentException("InputSplit must implement Writable interface."); - } - this.mapreduceInputSplit = mapreduceInputSplit; - this.jobContext = jobContext; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeInt(this.splitNumber); - out.writeUTF(this.mapreduceInputSplit.getClass().getName()); - Writable w = (Writable) this.mapreduceInputSplit; - w.write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - this.splitNumber = in.readInt(); - String className = in.readUTF(); - - if(this.mapreduceInputSplit == null) { - try { - Class<? extends org.apache.hadoop.io.Writable> inputSplit = - Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); - this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); - } catch (Exception e) { - throw new RuntimeException("Unable to create InputSplit", e); - } - } - ((Writable)this.mapreduceInputSplit).readFields(in); - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeInt(this.splitNumber); - out.writeUTF(this.mapreduceInputSplit.getClass().getName()); - Writable w = (Writable) this.mapreduceInputSplit; - w.write(out); - - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - this.splitNumber=in.readInt(); - String className = in.readUTF(); - - if(this.mapreduceInputSplit == null) { - try { - Class<? extends org.apache.hadoop.io.Writable> inputSplit = - Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); - this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); - } catch (Exception e) { - throw new RuntimeException("Unable to create InputSplit", e); - } - } - ((Writable)this.mapreduceInputSplit).readFields(in); - } - - @Override - public int getSplitNumber() { - return this.splitNumber; - } - - @Override - public String[] getHostnames() { - try { - return this.mapreduceInputSplit.getLocations(); - } catch (IOException e) { - return new String[0]; - } catch (InterruptedException e) { - return new String[0]; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java deleted file mode 100644 index 32396b8..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopIOFormatsITCase.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.hadoopcompatibility.mapred; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.util.Collection; -import java.util.LinkedList; - -@RunWith(Parameterized.class) -public class HadoopIOFormatsITCase extends JavaProgramTestBase { - - private static int NUM_PROGRAMS = 2; - - private int curProgId = config.getInteger("ProgramId", -1); - private String[] resultPath; - private String[] expectedResult; - private String sequenceFileInPath; - private String sequenceFileInPathNull; - - public HadoopIOFormatsITCase(Configuration config) { - super(config); - } - - @Override - protected void preSubmit() throws Exception { - resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") }; - - File sequenceFile = createAndRegisterTempFile("seqFile"); - sequenceFileInPath = sequenceFile.toURI().toString(); - - // Create a sequence file - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf); - Path path = new Path(sequenceFile.getAbsolutePath()); - - // ------------------ Long / Text Key Value pair: ------------ - int kvCount = 4; - - LongWritable key = new LongWritable(); - Text value = new Text(); - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass()); - for (int i = 0; i < kvCount; i ++) { - if(i == 1) { - // write key = 0 a bit more often. - for(int a = 0;a < 15; a++) { - key.set(i); - value.set(i+" - somestring"); - writer.append(key, value); - } - } - key.set(i); - value.set(i+" - somestring"); - writer.append(key, value); - } - } finally { - IOUtils.closeStream(writer); - } - - - // ------------------ Long / Text Key Value pair: ------------ - - File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey"); - sequenceFileInPathNull = sequenceFileNull.toURI().toString(); - path = new Path(sequenceFileInPathNull); - - LongWritable value1 = new LongWritable(); - SequenceFile.Writer writer1 = null; - try { - writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass()); - for (int i = 0; i < kvCount; i ++) { - value1.set(i); - writer1.append(NullWritable.get(), value1); - } - } finally { - IOUtils.closeStream(writer1); - } - } - - @Override - protected void testProgram() throws Exception { - expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull); - } - - @Override - protected void postSubmit() throws Exception { - for(int i = 0; i < resultPath.length; i++) { - compareResultsByLinesInMemory(expectedResult[i], resultPath[i]); - } - } - - @Parameters - public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { - - LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); - - for(int i=1; i <= NUM_PROGRAMS; i++) { - Configuration config = new Configuration(); - config.setInteger("ProgramId", i); - tConfigs.add(config); - } - - return toParameterList(tConfigs); - } - - public static class HadoopIOFormatPrograms { - - public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception { - - switch(progId) { - case 1: { - /** - * Test sequence file, including a key access. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>(); - JobConf hdconf = new JobConf(); - SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath)); - HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf); - DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif); - DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() { - @Override - public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception { - return new Tuple2<Long, Text>(value.f0.get(), value.f1); - } - }).sum(0); - sumed.writeAsText(resultPath[0]); - DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() { - @Override - public String map(Tuple2<LongWritable, Text> value) throws Exception { - return value.f1 + " - " + value.f0.get(); - } - }); - res.writeAsText(resultPath[1]); - env.execute(); - - // return expected result - return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" + - "1 - somestring - 1\n" + - "2 - somestring - 2\n" + - "3 - somestring - 3\n"}; - - } - case 2: { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>(); - JobConf hdconf = new JobConf(); - SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull)); - HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf); - DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif); - DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() { - @Override - public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception { - return new Tuple2<Void, Long>(null, value.f1.get()); - } - }); - DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1); - res1.writeAsText(resultPath[1]); - res.writeAsText(resultPath[0]); - env.execute(); - - // return expected result - return new String [] {"(null,2)\n" + - "(null,0)\n" + - "(null,1)\n" + - "(null,3)", - "(null,0)\n" + - "(null,1)\n" + - "(null,2)\n" + - "(null,3)"}; - } - default: - throw new IllegalArgumentException("Invalid program id"); - } - - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java deleted file mode 100644 index 00fd1f9..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.hadoopcompatibility.mapred; - - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapred.FileInputFormat; -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.fail; - - -public class HadoopInputFormatTest { - - - public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { - - public DummyVoidKeyInputFormat() { - } - - @Override - public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { - return null; - } - } - - - @Test - public void checkTypeInformation() { - try { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // Set up the Hadoop Input Format - Job job = Job.getInstance(); - HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf()); - - TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); - TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); - - if(tupleType.isTupleType()) { - if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { - fail("Tuple type information was not set correctly!"); - } - } else { - fail("Type information was not set to tuple type information!"); - } - - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java deleted file mode 100644 index d79afaa..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.hadoopcompatibility.mapreduce; - - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.junit.Test; - -import java.io.IOException; - -import static org.junit.Assert.fail; - - - -public class HadoopInputFormatTest { - - - public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { - - public DummyVoidKeyInputFormat() { - } - - @Override - public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return null; - } - } - - - @Test - public void checkTypeInformation() { - try { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // Set up the Hadoop Input Format - Job job = Job.getInstance(); - HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job); - - TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); - TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); - - if(tupleType.isTupleType()) { - if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { - fail("Tuple type information was not set correctly!"); - } - } else { - fail("Type information was not set to tuple type information!"); - } - - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index bcb9764..7bf3e2e 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -116,12 +116,12 @@ under the License. <scope>test</scope> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - <scope>test</scope> - </dependency> + <!--<dependency>--> + <!--<groupId>com.google.guava</groupId>--> + <!--<artifactId>guava</artifactId>--> + <!--<version>${guava.version}</version>--> + <!--<scope>test</scope>--> + <!--</dependency>--> <dependency> <groupId>org.scalatest</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java new file mode 100644 index 0000000..0cb1ac5 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/HadoopIOFormatsITCase.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoop.mapred; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class HadoopIOFormatsITCase extends JavaProgramTestBase { + + private static int NUM_PROGRAMS = 2; + + private int curProgId = config.getInteger("ProgramId", -1); + private String[] resultPath; + private String[] expectedResult; + private String sequenceFileInPath; + private String sequenceFileInPathNull; + + public HadoopIOFormatsITCase(Configuration config) { + super(config); + } + + @Override + protected void preSubmit() throws Exception { + resultPath = new String[] {getTempDirPath("result0"), getTempDirPath("result1") }; + + File sequenceFile = createAndRegisterTempFile("seqFile"); + sequenceFileInPath = sequenceFile.toURI().toString(); + + // Create a sequence file + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + FileSystem fs = FileSystem.get(URI.create(sequenceFile.getAbsolutePath()), conf); + Path path = new Path(sequenceFile.getAbsolutePath()); + + // ------------------ Long / Text Key Value pair: ------------ + int kvCount = 4; + + LongWritable key = new LongWritable(); + Text value = new Text(); + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass()); + for (int i = 0; i < kvCount; i ++) { + if(i == 1) { + // write key = 0 a bit more often. + for(int a = 0;a < 15; a++) { + key.set(i); + value.set(i+" - somestring"); + writer.append(key, value); + } + } + key.set(i); + value.set(i+" - somestring"); + writer.append(key, value); + } + } finally { + IOUtils.closeStream(writer); + } + + + // ------------------ Long / Text Key Value pair: ------------ + + File sequenceFileNull = createAndRegisterTempFile("seqFileNullKey"); + sequenceFileInPathNull = sequenceFileNull.toURI().toString(); + path = new Path(sequenceFileInPathNull); + + LongWritable value1 = new LongWritable(); + SequenceFile.Writer writer1 = null; + try { + writer1 = SequenceFile.createWriter( fs, conf, path, NullWritable.class, value1.getClass()); + for (int i = 0; i < kvCount; i ++) { + value1.set(i); + writer1.append(NullWritable.get(), value1); + } + } finally { + IOUtils.closeStream(writer1); + } + } + + @Override + protected void testProgram() throws Exception { + expectedResult = HadoopIOFormatPrograms.runProgram(curProgId, resultPath, sequenceFileInPath, sequenceFileInPathNull); + } + + @Override + protected void postSubmit() throws Exception { + for(int i = 0; i < resultPath.length; i++) { + compareResultsByLinesInMemory(expectedResult[i], resultPath[i]); + } + } + + @Parameters + public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException { + + LinkedList<Configuration> tConfigs = new LinkedList<Configuration>(); + + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); + } + + return TestBaseUtils.toParameterList(tConfigs); + } + + public static class HadoopIOFormatPrograms { + + public static String[] runProgram(int progId, String resultPath[], String sequenceFileInPath, String sequenceFileInPathNull) throws Exception { + + switch(progId) { + case 1: { + /** + * Test sequence file, including a key access. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + SequenceFileInputFormat<LongWritable, Text> sfif = new SequenceFileInputFormat<LongWritable, Text>(); + JobConf hdconf = new JobConf(); + SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPath)); + HadoopInputFormat<LongWritable, Text> hif = new HadoopInputFormat<LongWritable, Text>(sfif, LongWritable.class, Text.class, hdconf); + DataSet<Tuple2<LongWritable, Text>> ds = env.createInput(hif); + DataSet<Tuple2<Long, Text>> sumed = ds.map(new MapFunction<Tuple2<LongWritable, Text>, Tuple2<Long, Text>>() { + @Override + public Tuple2<Long, Text> map(Tuple2<LongWritable, Text> value) throws Exception { + return new Tuple2<Long, Text>(value.f0.get(), value.f1); + } + }).sum(0); + sumed.writeAsText(resultPath[0]); + DataSet<String> res = ds.distinct(0).map(new MapFunction<Tuple2<LongWritable, Text>, String>() { + @Override + public String map(Tuple2<LongWritable, Text> value) throws Exception { + return value.f1 + " - " + value.f0.get(); + } + }); + res.writeAsText(resultPath[1]); + env.execute(); + + // return expected result + return new String [] {"(21,3 - somestring)", "0 - somestring - 0\n" + + "1 - somestring - 1\n" + + "2 - somestring - 2\n" + + "3 - somestring - 3\n"}; + + } + case 2: { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + SequenceFileInputFormat<NullWritable, LongWritable> sfif = new SequenceFileInputFormat<NullWritable, LongWritable>(); + JobConf hdconf = new JobConf(); + SequenceFileInputFormat.addInputPath(hdconf, new Path(sequenceFileInPathNull)); + HadoopInputFormat<NullWritable, LongWritable> hif = new HadoopInputFormat<NullWritable, LongWritable>(sfif, NullWritable.class, LongWritable.class, hdconf); + DataSet<Tuple2<NullWritable, LongWritable>> ds = env.createInput(hif); + DataSet<Tuple2<Void, Long>> res = ds.map(new MapFunction<Tuple2<NullWritable, LongWritable>, Tuple2<Void, Long>>() { + @Override + public Tuple2<Void, Long> map(Tuple2<NullWritable, LongWritable> value) throws Exception { + return new Tuple2<Void, Long>(null, value.f1.get()); + } + }); + DataSet<Tuple2<Void, Long>> res1 = res.groupBy(1).sum(1); + res1.writeAsText(resultPath[1]); + res.writeAsText(resultPath[0]); + env.execute(); + + // return expected result + return new String [] {"(null,2)\n" + + "(null,0)\n" + + "(null,1)\n" + + "(null,3)", + "(null,0)\n" + + "(null,1)\n" + + "(null,2)\n" + + "(null,3)"}; + } + default: + throw new IllegalArgumentException("Invalid program id"); + } + + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java new file mode 100644 index 0000000..037610e --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoop.mapred; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + +public class WordCountMapredITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + public WordCountMapredITCase(){ +// setDegreeOfParallelism(4); +// setNumTaskManagers(2); +// setTaskManagerNumSlots(2); + } + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +// env.setDegreeOfParallelism(1); + + + DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath); + + DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() { + @Override + public String map(Tuple2<LongWritable, Text> value) throws Exception { + return value.f1.toString(); + } + }); + + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() { + + + @Override + public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1)); + } + }); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); + hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(resultPath)); + + // Output & Execute + words.output(hadoopOutputFormat); + env.execute("Hadoop Compat WordCount"); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java new file mode 100644 index 0000000..3bdaa22 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoop.mapreduce; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class WordCountMapreduceITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + public WordCountMapreduceITCase(){ +// setDegreeOfParallelism(4); +// setNumTaskManagers(2); +// setTaskManagerNumSlots(2); + } + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + + DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath); + + DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() { + @Override + public String map(Tuple2<LongWritable, Text> value) throws Exception { + return value.f1.toString(); + } + }); + + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() { + + + @Override + public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1)); + } + }); + + // Set up Hadoop Output Format + Job job = Job.getInstance(); + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), job); + job.getConfiguration().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(job, new Path(resultPath)); + + // Output & Execute + words.output(hadoopOutputFormat); + env.execute("Hadoop Compat WordCount"); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala new file mode 100644 index 0000000..c8d6639 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop.mapred + +import org.apache.flink.api.scala._ + +import org.apache.flink.test.testdata.WordCountData +import org.apache.flink.test.util.JavaProgramTestBase +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat} + +class WordCountMapredITCase extends JavaProgramTestBase { + protected var textPath: String = null + protected var resultPath: String = null + + protected override def preSubmit() { + textPath = createTempFile("text.txt", WordCountData.TEXT) + resultPath = getTempDirPath("result") + } + + protected override def postSubmit() { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_")) + } + + protected def testProgram() { + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + + val text = input map { _._2.toString } + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } + + val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( + new TextOutputFormat[Text, LongWritable], + new JobConf) + hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ") + + FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path(resultPath)) + + words.output(hadoopOutputFormat) + + env.execute("Hadoop Compat WordCount") + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala new file mode 100644 index 0000000..8988baf --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.hadoop.mapreduce + +import org.apache.flink.api.scala._ +import org.apache.flink.test.testdata.WordCountData +import org.apache.flink.test.util.JavaProgramTestBase +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} + +class WordCountMapreduceITCase extends JavaProgramTestBase { + protected var textPath: String = null + protected var resultPath: String = null + + protected override def preSubmit() { + textPath = createTempFile("text.txt", WordCountData.TEXT) + resultPath = getTempDirPath("result") + } + + protected override def postSubmit() { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_")) + } + + protected def testProgram() { + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + + val text = input map { _._2.toString } + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } + + val job = Job.getInstance() + val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( + new TextOutputFormat[Text, LongWritable], + job) + hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ") + + FileOutputFormat.setOutputPath(job, new Path(resultPath)) + + words.output(hadoopOutputFormat) + + env.execute("Hadoop Compat WordCount") + } +} +
