[
https://issues.apache.org/jira/browse/FLINK-2444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208199#comment-15208199
]
ASF GitHub Bot commented on FLINK-2444:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1628#discussion_r57137819
--- Diff:
flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
---
@@ -20,22 +20,137 @@
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.util.List;
-import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.*;
public class HadoopInputFormatTest {
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testConfigure() throws Exception {
+
+ ConfigurableDummyInputFormat inputFormat =
mock(ConfigurableDummyInputFormat.class);
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+ hadoopInputFormat.configure(new
org.apache.flink.configuration.Configuration());
+
+ verify(inputFormat, times(1)).setConf(any(Configuration.class));
+ }
+
+ @Test
+ public void testCreateInputSplits() throws Exception {
+ DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+ hadoopInputFormat.createInputSplits(2);
+
+ verify(inputFormat, times(1)).getSplits(any(JobContext.class));
+ }
+
+ @Test
+ public void testOpen() throws Exception {
+ DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+ when(inputFormat.createRecordReader(any(InputSplit.class),
any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader());
+ HadoopInputSplit inputSplit = mock(HadoopInputSplit.class);
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+ hadoopInputFormat.open(inputSplit);
+
+ verify(inputFormat,
times(1)).createRecordReader(any(InputSplit.class),
any(TaskAttemptContext.class));
+ assertThat(hadoopInputFormat.fetched, is(false));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+
+ DummyRecordReader recordReader = mock(DummyRecordReader.class);
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+ hadoopInputFormat.close();
+
+ verify(recordReader, times(1)).close();
+ }
+
+ @Test
+ public void testFetchNextInitialState() throws Exception {
+ DummyRecordReader recordReader = new DummyRecordReader();
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+ hadoopInputFormat.fetchNext();
+
+ assertThat(hadoopInputFormat.fetched, is(true));
+ assertThat(hadoopInputFormat.hasNext, is(false));
+ }
+
+ @Test
+ public void testFetchNextRecordReaderHasNewValue() throws Exception {
+
+ DummyRecordReader recordReader = mock(DummyRecordReader.class);
+ when(recordReader.nextKeyValue()).thenReturn(true);
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+ hadoopInputFormat.fetchNext();
+
+ assertThat(hadoopInputFormat.fetched, is(true));
+ assertThat(hadoopInputFormat.hasNext, is(true));
+ }
+
+ @Test
+ public void testFetchNextRecordReaderThrowsException() throws Exception
{
+
+ DummyRecordReader recordReader = mock(DummyRecordReader.class);
+ when(recordReader.nextKeyValue()).thenThrow(new
InterruptedException());
+
+ HadoopInputFormat<String, Long> hadoopInputFormat =
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+
+ exception.expect(IOException.class);
+ hadoopInputFormat.fetchNext();
+
+ assertThat(hadoopInputFormat.hasNext, is(true));
+ }
+
+ @Test
+ public void checkTypeInformation() throws Exception {
+
+ HadoopInputFormat<Void, Long> hadoopInputFormat = new
HadoopInputFormat<>(
+ new DummyVoidKeyInputFormat<Long>(),
Void.class, Long.class, Job.getInstance());
+
+ TypeInformation<Tuple2<Void,Long>> tupleType =
hadoopInputFormat.getProducedType();
+ TypeInformation<Tuple2<Void,Long>> expectedType = new
TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+
+ assertThat(tupleType.isTupleType(), is(true));
+ assertThat(tupleType, is(equalTo(expectedType)));
+ }
+
+
+ private HadoopInputFormat<String, Long>
setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
+
RecordReader<String, Long>
recordReader) {
--- End diff --
please indent only with tabs
> Add tests for HadoopInputFormats
> --------------------------------
>
> Key: FLINK-2444
> URL: https://issues.apache.org/jira/browse/FLINK-2444
> Project: Flink
> Issue Type: Test
> Components: Hadoop Compatibility, Tests
> Affects Versions: 0.9.0, 0.10.0
> Reporter: Fabian Hueske
> Assignee: Martin Liesenberg
> Labels: starter
>
> The HadoopInputFormats and HadoopInputFormatBase classes are not sufficiently
> covered by unit tests.
> We need tests that ensure that the methods of the wrapped Hadoop InputFormats
> are correctly called.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)