Repository: flink Updated Branches: refs/heads/release-1.0 1909bdb41 -> 1ca4f36d0
[FLINK-2445] Add tests for HadoopOutputFormats Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edf343ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edf343ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edf343ab Branch: refs/heads/release-1.0 Commit: edf343abb6d026d1e2ddb9ba8aaadeca1139bea5 Parents: 1909bdb Author: Ajay Bhat <a.ajay.b...@gmail.com> Authored: Wed Jan 6 16:12:03 2016 +0530 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Mar 23 17:36:57 2016 +0100 ---------------------------------------------------------------------- .../mapreduce/HadoopOutputFormatTest.java | 130 +++++++++++++++++++ 1 file changed, 130 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/edf343ab/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java new file mode 100644 index 0000000..215ca95 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.hadoop.mapreduce; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.*; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class HadoopOutputFormatTest { + + private static final String PATH = "an/ignored/file/"; + private Map<String, Long> map; + + @Test + public void testWriteRecord() { + OutputFormat<String, Long> dummyOutputFormat = new DummyOutputFormat(); + String key = "Test"; + Long value = 1L; + map = new HashMap<>(); + map.put(key, 0L); + try { + Job job = Job.getInstance(); + Tuple2<String, Long> tuple = new Tuple2<>(); + tuple.setFields(key, value); + HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat, job); + + hadoopOutputFormat.recordWriter = new DummyRecordWriter(); + hadoopOutputFormat.writeRecord(tuple); + + Long expected = map.get(key); + assertEquals(expected, value); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testOpen() { + OutputFormat<String, Long> dummyOutputFormat = new DummyOutputFormat(); + try { + Job job = Job.getInstance(); + HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat, job); + + hadoopOutputFormat.recordWriter = new DummyRecordWriter(); + hadoopOutputFormat.open(1, 4); + } catch (IOException e) { + fail(); + } + } + + @Test + public void testClose() { + OutputFormat<String, Long> dummyOutputFormat = new DummyOutputFormat(); + try { + Job job = Job.getInstance(); + HadoopOutputFormat<String, Long> hadoopOutputFormat = new HadoopOutputFormat<>(dummyOutputFormat, job); + + hadoopOutputFormat.recordWriter = new DummyRecordWriter(); + + final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class); + Mockito.when(outputCommitter.needsTaskCommit(Mockito.any(TaskAttemptContext.class))).thenReturn(true); + Mockito.doNothing().when(outputCommitter).commitTask(Mockito.any(TaskAttemptContext.class)); + hadoopOutputFormat.outputCommitter = outputCommitter; + hadoopOutputFormat.configuration = new Configuration(); + hadoopOutputFormat.configuration.set("mapred.output.dir", PATH); + + hadoopOutputFormat.close(); + } catch (IOException e) { + fail(); + } + } + + + class DummyRecordWriter extends RecordWriter<String, Long> { + @Override + public void write(String key, Long value) throws IOException, InterruptedException { + map.put(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + + } + } + + class DummyOutputFormat extends OutputFormat<String, Long> { + @Override + public RecordWriter<String, Long> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + return null; + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + final OutputCommitter outputCommitter = Mockito.mock(OutputCommitter.class); + Mockito.doNothing().when(outputCommitter).setupJob(Mockito.any(JobContext.class)); + + return outputCommitter; + } + } +}