[ 
https://issues.apache.org/jira/browse/FLINK-2444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208199#comment-15208199
 ] 

ASF GitHub Bot commented on FLINK-2444:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1628#discussion_r57137819
  
    --- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
 ---
    @@ -20,22 +20,137 @@
     
     import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
     import org.apache.flink.api.java.tuple.Tuple2;
     import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    -import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
    -import org.apache.hadoop.mapreduce.InputSplit;
    -import org.apache.hadoop.mapreduce.Job;
    -import org.apache.hadoop.mapreduce.RecordReader;
    -import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.conf.Configurable;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapreduce.*;
     import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    +import org.junit.Rule;
     import org.junit.Test;
    +import org.junit.rules.ExpectedException;
     
     import java.io.IOException;
    +import java.util.List;
     
    -import static org.junit.Assert.fail;
    +import static org.hamcrest.CoreMatchers.equalTo;
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Mockito.*;
     
     public class HadoopInputFormatTest {
     
    +   @Rule
    +   public final ExpectedException exception = ExpectedException.none();
    +
    +   @Test
    +   public void testConfigure() throws Exception {
    +
    +           ConfigurableDummyInputFormat inputFormat = 
mock(ConfigurableDummyInputFormat.class);
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
    +           hadoopInputFormat.configure(new 
org.apache.flink.configuration.Configuration());
    +
    +           verify(inputFormat, times(1)).setConf(any(Configuration.class));
    +   }
    +
    +   @Test
    +   public void testCreateInputSplits() throws Exception {
    +           DummyInputFormat inputFormat = mock(DummyInputFormat.class);
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
    +           hadoopInputFormat.createInputSplits(2);
    +
    +           verify(inputFormat, times(1)).getSplits(any(JobContext.class));
    +   }
    +
    +   @Test
    +   public void testOpen() throws Exception {
    +           DummyInputFormat inputFormat = mock(DummyInputFormat.class);
    +           when(inputFormat.createRecordReader(any(InputSplit.class), 
any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader());
    +           HadoopInputSplit inputSplit = mock(HadoopInputSplit.class);
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
    +           hadoopInputFormat.open(inputSplit);
    +
    +           verify(inputFormat, 
times(1)).createRecordReader(any(InputSplit.class), 
any(TaskAttemptContext.class));
    +           assertThat(hadoopInputFormat.fetched, is(false));
    +   }
    +
    +   @Test
    +   public void testClose() throws Exception {
    +
    +           DummyRecordReader recordReader = mock(DummyRecordReader.class);
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
    +           hadoopInputFormat.close();
    +
    +           verify(recordReader, times(1)).close();
    +   }
    +
    +   @Test
    +   public void testFetchNextInitialState() throws Exception {
    +           DummyRecordReader recordReader = new DummyRecordReader();
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
    +           hadoopInputFormat.fetchNext();
    +
    +           assertThat(hadoopInputFormat.fetched, is(true));
    +           assertThat(hadoopInputFormat.hasNext, is(false));
    +   }
    +
    +   @Test
    +   public void testFetchNextRecordReaderHasNewValue() throws Exception {
    +
    +           DummyRecordReader recordReader = mock(DummyRecordReader.class);
    +           when(recordReader.nextKeyValue()).thenReturn(true);
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
    +           hadoopInputFormat.fetchNext();
    +
    +           assertThat(hadoopInputFormat.fetched, is(true));
    +           assertThat(hadoopInputFormat.hasNext, is(true));
    +   }
    +
    +   @Test
    +   public void testFetchNextRecordReaderThrowsException() throws Exception 
{
    +
    +           DummyRecordReader recordReader = mock(DummyRecordReader.class);
    +           when(recordReader.nextKeyValue()).thenThrow(new 
InterruptedException());
    +
    +           HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
    +
    +           exception.expect(IOException.class);
    +           hadoopInputFormat.fetchNext();
    +
    +           assertThat(hadoopInputFormat.hasNext, is(true));
    +   }
    +
    +   @Test
    +   public void checkTypeInformation() throws Exception {
    +
    +           HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<>(
    +                           new DummyVoidKeyInputFormat<Long>(), 
Void.class, Long.class, Job.getInstance());
    +
    +           TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
    +           TypeInformation<Tuple2<Void,Long>> expectedType = new 
TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
    +
    +           assertThat(tupleType.isTupleType(), is(true));
    +           assertThat(tupleType, is(equalTo(expectedType)));
    +   }
    +
    +
    +   private HadoopInputFormat<String, Long> 
setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
    +                                                                           
                                                   RecordReader<String, Long> 
recordReader) {
    --- End diff --
    
    please indent only with tabs


> Add tests for HadoopInputFormats
> --------------------------------
>
>                 Key: FLINK-2444
>                 URL: https://issues.apache.org/jira/browse/FLINK-2444
>             Project: Flink
>          Issue Type: Test
>          Components: Hadoop Compatibility, Tests
>    Affects Versions: 0.9.0, 0.10.0
>            Reporter: Fabian Hueske
>            Assignee: Martin Liesenberg
>              Labels: starter
>
> The HadoopInputFormats and HadoopInputFormatBase classes are not sufficiently 
> covered by unit tests.
> We need tests that ensure that the methods of the wrapped Hadoop InputFormats 
> are correctly called. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to