[ https://issues.apache.org/jira/browse/FLINK-2444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208199#comment-15208199 ]
ASF GitHub Bot commented on FLINK-2444: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1628#discussion_r57137819 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java --- @@ -20,22 +20,137 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.List; -import static org.junit.Assert.fail; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.*; public class HadoopInputFormatTest { + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void testConfigure() throws Exception { + + ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null); + hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration()); + + verify(inputFormat, times(1)).setConf(any(Configuration.class)); + } + + @Test + public void testCreateInputSplits() throws Exception { + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null); + hadoopInputFormat.createInputSplits(2); + + verify(inputFormat, times(1)).getSplits(any(JobContext.class)); + } + + @Test + public void testOpen() throws Exception { + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + when(inputFormat.createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader()); + HadoopInputSplit inputSplit = mock(HadoopInputSplit.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null); + hadoopInputFormat.open(inputSplit); + + verify(inputFormat, times(1)).createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class)); + assertThat(hadoopInputFormat.fetched, is(false)); + } + + @Test + public void testClose() throws Exception { + + DummyRecordReader recordReader = mock(DummyRecordReader.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + hadoopInputFormat.close(); + + verify(recordReader, times(1)).close(); + } + + @Test + public void testFetchNextInitialState() throws Exception { + DummyRecordReader recordReader = new DummyRecordReader(); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + hadoopInputFormat.fetchNext(); + + assertThat(hadoopInputFormat.fetched, is(true)); + assertThat(hadoopInputFormat.hasNext, is(false)); + } + + @Test + public void testFetchNextRecordReaderHasNewValue() throws Exception { + + DummyRecordReader recordReader = mock(DummyRecordReader.class); + when(recordReader.nextKeyValue()).thenReturn(true); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + hadoopInputFormat.fetchNext(); + + assertThat(hadoopInputFormat.fetched, is(true)); + assertThat(hadoopInputFormat.hasNext, is(true)); + } + + @Test + public void testFetchNextRecordReaderThrowsException() throws Exception { + + DummyRecordReader recordReader = mock(DummyRecordReader.class); + when(recordReader.nextKeyValue()).thenThrow(new InterruptedException()); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + + exception.expect(IOException.class); + hadoopInputFormat.fetchNext(); + + assertThat(hadoopInputFormat.hasNext, is(true)); + } + + @Test + public void checkTypeInformation() throws Exception { + + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>( + new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, Job.getInstance()); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + assertThat(tupleType.isTupleType(), is(true)); + assertThat(tupleType, is(equalTo(expectedType))); + } + + + private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job, + RecordReader<String, Long> recordReader) { --- End diff -- please indent only with tabs > Add tests for HadoopInputFormats > -------------------------------- > > Key: FLINK-2444 > URL: https://issues.apache.org/jira/browse/FLINK-2444 > Project: Flink > Issue Type: Test > Components: Hadoop Compatibility, Tests > Affects Versions: 0.9.0, 0.10.0 > Reporter: Fabian Hueske > Assignee: Martin Liesenberg > Labels: starter > > The HadoopInputFormats and HadoopInputFormatBase classes are not sufficiently > covered by unit tests. > We need tests that ensure that the methods of the wrapped Hadoop InputFormats > are correctly called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)