[FLINK-2444] Add tests for HadoopInputFormats

This closes #1628


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6c3778f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6c3778f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6c3778f

Branch: refs/heads/master
Commit: a6c3778fd892398d2a6b9c4a02eee1613eda3f42
Parents: bcb34dd
Author: Martin Liesenberg <martin.liesenb...@gmail.com>
Authored: Fri Feb 12 00:18:06 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Mar 23 15:31:54 2016 +0100

----------------------------------------------------------------------
 .../hadoop/mapred/HadoopInputFormatBase.java    |   4 +-
 .../hadoop/mapreduce/HadoopInputFormat.java     |  10 +-
 .../hadoop/mapreduce/HadoopInputFormatBase.java |  12 +-
 .../hadoop/mapred/HadoopInputFormatTest.java    | 275 +++++++++++++++++--
 .../hadoop/mapreduce/HadoopInputFormatTest.java | 202 ++++++++++++--
 5 files changed, 443 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index d80c80a..ef9999f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -148,7 +148,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
                        throws IOException {
                org.apache.hadoop.mapred.InputSplit[] splitArray = 
mapredInputFormat.getSplits(jobConf, minNumSplits);
                HadoopInputSplit[] hiSplit = new 
HadoopInputSplit[splitArray.length];
-               for(int i=0;i<splitArray.length;i++){
+               for (int i=0; i<splitArray.length; i++) {
                        hiSplit[i] = new HadoopInputSplit(i, splitArray[i], 
jobConf);
                }
                return hiSplit;
@@ -177,7 +177,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
        
        @Override
        public boolean reachedEnd() throws IOException {
-               if(!fetched) {
+               if (!fetched) {
                        fetchNext();
                }
                return !hasNext;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
index dadb954..abbcd25 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.api.java.hadoop.mapreduce;
 
-import java.io.IOException;
-
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.hadoop.mapreduce.Job;
+
+import java.io.IOException;
 
 /**
  * InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats 
with Flink.
@@ -49,10 +49,10 @@ public class HadoopInputFormat<K, V> extends 
HadoopInputFormatBase<K, V, Tuple2<
 
        @Override
        public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-               if(!this.fetched) {
+               if (!this.fetched) {
                        fetchNext();
                }
-               if(!this.hasNext) {
+               if (!this.hasNext) {
                        return null;
                }
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 4829ce7..73b11eb 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -111,7 +111,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
        @Override
        public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
                // only gather base statistics for FileInputFormats
-               if(!(mapreduceInputFormat instanceof FileInputFormat)) {
+               if (!(mapreduceInputFormat instanceof FileInputFormat)) {
                        return null;
                }
 
@@ -158,7 +158,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
 
                jobContext.getCredentials().addAll(this.credentials);
                Credentials currentUserCreds = 
getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
-               if(currentUserCreds != null) {
+               if (currentUserCreds != null) {
                        jobContext.getCredentials().addAll(currentUserCreds);
                }
 
@@ -170,7 +170,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
                }
                HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
 
-               for(int i = 0; i < hadoopInputSplits.length; i++){
+               for (int i = 0; i < hadoopInputSplits.length; i++) {
                        hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
                }
                return hadoopInputSplits;
@@ -208,7 +208,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
 
        @Override
        public boolean reachedEnd() throws IOException {
-               if(!this.fetched) {
+               if (!this.fetched) {
                        fetchNext();
                }
                return !this.hasNext;
@@ -243,7 +243,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
                long latestModTime = 0L;
 
                // get the file info and check whether the cached statistics 
are still valid.
-               for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
+               for (org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
 
                        final Path filePath = new Path(hadoopPath.toUri());
                        final FileSystem fs = FileSystem.get(filePath.toUri());
@@ -308,7 +308,7 @@ public abstract class HadoopInputFormatBase<K, V, T> 
extends HadoopInputFormatCo
                org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
                configuration.readFields(in);
 
-               if(this.configuration == null) {
+               if (this.configuration == null) {
                        this.configuration = configuration;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
index 63f8df4..3b8d227 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java
@@ -20,21 +20,168 @@ package org.apache.flink.api.java.hadoop.mapred;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.junit.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 public class HadoopInputFormatTest {
 
+       @Test
+       public void testConfigureWithConfigurableInstance() {
+               ConfigurableDummyInputFormat inputFormat = 
mock(ConfigurableDummyInputFormat.class);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+               verify(inputFormat, times(1)).setConf(any(JobConf.class));
+
+               hadoopInputFormat.configure(new 
org.apache.flink.configuration.Configuration());
+               verify(inputFormat, times(2)).setConf(any(JobConf.class));
+       }
+
+       @Test
+       public void testConfigureWithJobConfigurableInstance() {
+               JobConfigurableDummyInputFormat inputFormat = 
mock(JobConfigurableDummyInputFormat.class);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+               verify(inputFormat, times(1)).configure(any(JobConf.class));
+
+               hadoopInputFormat.configure(new 
org.apache.flink.configuration.Configuration());
+               verify(inputFormat, times(2)).configure(any(JobConf.class));
+       }
+
+       @Test
+       public void testOpenClose() throws Exception {
+               DummyRecordReader recordReader = mock(DummyRecordReader.class);
+               DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+               when(inputFormat.getRecordReader(any(InputSplit.class), 
any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+               hadoopInputFormat.open(getHadoopInputSplit());
+
+               verify(inputFormat, 
times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), 
any(Reporter.class));
+               verify(recordReader, times(1)).createKey();
+               verify(recordReader, times(1)).createValue();
+
+               assertThat(hadoopInputFormat.fetched, is(false));
+
+               hadoopInputFormat.close();
+               verify(recordReader, times(1)).close();
+       }
+
+       @Test
+       public void testOpenWithConfigurableReader() throws Exception {
+               ConfigurableDummyRecordReader recordReader = 
mock(ConfigurableDummyRecordReader.class);
+               DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+               when(inputFormat.getRecordReader(any(InputSplit.class), 
any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+               hadoopInputFormat.open(getHadoopInputSplit());
+
+               verify(inputFormat, 
times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), 
any(Reporter.class));
+               verify(recordReader, times(1)).setConf(any(JobConf.class));
+               verify(recordReader, times(1)).createKey();
+               verify(recordReader, times(1)).createValue();
+
+               assertThat(hadoopInputFormat.fetched, is(false));
+
+       }
+
+       @Test
+       public void testCreateInputSplits() throws Exception {
+
+               FileSplit[] result = new FileSplit[1];
+               result[0] = getFileSplit();
+               DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+               when(inputFormat.getSplits(any(JobConf.class), 
anyInt())).thenReturn(result);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+               hadoopInputFormat.createInputSplits(2);
+
+               verify(inputFormat, times(1)).getSplits(any(JobConf.class), 
anyInt());
+       }
+
+       @Test
+       public void testReachedEndWithElementsRemaining() throws IOException {
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new 
JobConf());
+               hadoopInputFormat.fetched = true;
+               hadoopInputFormat.hasNext = true;
+
+               assertThat(hadoopInputFormat.reachedEnd(), is(false));
+       }
+
+       @Test
+       public void testReachedEndWithNoElementsRemaining() throws IOException {
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new 
JobConf());
+               hadoopInputFormat.fetched = true;
+               hadoopInputFormat.hasNext = false;
+
+               assertThat(hadoopInputFormat.reachedEnd(), is(true));
+       }
+
+       @Test
+       public void testFetchNext() throws IOException {
+               DummyRecordReader recordReader = mock(DummyRecordReader.class);
+               when(recordReader.next(anyString(), 
anyLong())).thenReturn(true);
+
+               DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+               when(inputFormat.getRecordReader(any(InputSplit.class), 
any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
+               hadoopInputFormat.open(getHadoopInputSplit());
+               hadoopInputFormat.fetchNext();
+
+               verify(recordReader, times(1)).next(anyString(), anyLong());
+               assertThat(hadoopInputFormat.hasNext, is(true));
+               assertThat(hadoopInputFormat.fetched, is(true));
+       }
+       
+       @Test
+       public void checkTypeInformation() throws Exception {
+               HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<>(
+                               new DummyVoidKeyInputFormat<Long>(), 
Void.class, Long.class, new JobConf());
+
+               TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+               TypeInformation<Tuple2<Void,Long>> expectedType = new 
TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+
+               assertThat(tupleType.isTupleType(), is(true));
+               assertThat(tupleType, is(equalTo(expectedType)));
+       }
+
+       private HadoopInputSplit getHadoopInputSplit() {
+               return new HadoopInputSplit(1, getFileSplit(), new JobConf());
+       }
+
+       private FileSplit getFileSplit() {
+               return new FileSplit(new Path("path"), 1, 2, new String[]{});
+       }
+
        public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
 
                public DummyVoidKeyInputFormat() {}
@@ -44,27 +191,107 @@ public class HadoopInputFormatTest {
                        return null;
                }
        }
-       
-       @Test
-       public void checkTypeInformation() {
-               try {
-                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>(
-                                       new DummyVoidKeyInputFormat<Long>(), 
Void.class, Long.class, new JobConf());
-
-                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
-                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
-                       
-                       if(tupleType.isTupleType()) {
-                               
if(!((TupleTypeInfo<?>)tupleType).equals(testTupleType)) {
-                                       fail("Tuple type information was not 
set correctly!");
-                               }
-                       } else {
-                               fail("Type information was not set to tuple 
type information!");
-                       }
-
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+
+       public class DummyRecordReader implements RecordReader<String, Long> {
+
+               @Override
+               public float getProgress() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public boolean next(String s, Long aLong) throws IOException {
+                       return false;
                }
+
+               @Override
+               public String createKey() {
+                       return null;
+               }
+
+               @Override
+               public Long createValue() {
+                       return null;
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public void close() throws IOException {
+
+               }
+       }
+
+       public class ConfigurableDummyRecordReader implements 
RecordReader<String, Long>, Configurable {
+
+               @Override
+               public void setConf(Configuration configuration) {}
+
+               @Override
+               public Configuration getConf() {
+                       return null;
+               }
+
+               @Override
+               public boolean next(String s, Long aLong) throws IOException {
+                       return false;
+               }
+
+               @Override
+               public String createKey() {
+                       return null;
+               }
+
+               @Override
+               public Long createValue() {
+                       return null;
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public void close() throws IOException {
+
+               }
+
+               @Override
+               public float getProgress() throws IOException {
+                       return 0;
+               }
+       }
+
+       public class DummyInputFormat implements InputFormat<String, Long> {
+
+               @Override
+               public InputSplit[] getSplits(JobConf jobConf, int i) throws 
IOException {
+                       return new InputSplit[0];
+               }
+
+               @Override
+               public RecordReader<String, Long> getRecordReader(InputSplit 
inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+                       return null;
+               }
+       }
+
+       public class ConfigurableDummyInputFormat extends DummyInputFormat 
implements Configurable {
+               @Override
+               public void setConf(Configuration configuration) {}
+
+               @Override
+               public Configuration getConf() {
+                       return null;
+               }
+       }
+
+       public class JobConfigurableDummyInputFormat extends DummyInputFormat 
implements JobConfigurable {
+
+               @Override
+               public void configure(JobConf jobConf) {}
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
index 919a286..d6ec484 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java
@@ -20,22 +20,146 @@ package org.apache.flink.api.java.hadoop.mapreduce;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.List;
 
-import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class HadoopInputFormatTest {
 
+       @Rule
+       public final ExpectedException exception = ExpectedException.none();
+
+       @Test
+       public void testConfigure() throws Exception {
+
+               ConfigurableDummyInputFormat inputFormat = 
mock(ConfigurableDummyInputFormat.class);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+               hadoopInputFormat.configure(new 
org.apache.flink.configuration.Configuration());
+
+               verify(inputFormat, times(1)).setConf(any(Configuration.class));
+       }
+
+       @Test
+       public void testCreateInputSplits() throws Exception {
+               DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+               hadoopInputFormat.createInputSplits(2);
+
+               verify(inputFormat, times(1)).getSplits(any(JobContext.class));
+       }
+
+       @Test
+       public void testOpen() throws Exception {
+               DummyInputFormat inputFormat = mock(DummyInputFormat.class);
+               when(inputFormat.createRecordReader(any(InputSplit.class), 
any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader());
+               HadoopInputSplit inputSplit = mock(HadoopInputSplit.class);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(inputFormat, Job.getInstance(), null);
+               hadoopInputFormat.open(inputSplit);
+
+               verify(inputFormat, 
times(1)).createRecordReader(any(InputSplit.class), 
any(TaskAttemptContext.class));
+               assertThat(hadoopInputFormat.fetched, is(false));
+       }
+
+       @Test
+       public void testClose() throws Exception {
+
+               DummyRecordReader recordReader = mock(DummyRecordReader.class);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+               hadoopInputFormat.close();
+
+               verify(recordReader, times(1)).close();
+       }
+
+       @Test
+       public void testFetchNextInitialState() throws Exception {
+               DummyRecordReader recordReader = new DummyRecordReader();
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+               hadoopInputFormat.fetchNext();
+
+               assertThat(hadoopInputFormat.fetched, is(true));
+               assertThat(hadoopInputFormat.hasNext, is(false));
+       }
+
+       @Test
+       public void testFetchNextRecordReaderHasNewValue() throws Exception {
+
+               DummyRecordReader recordReader = mock(DummyRecordReader.class);
+               when(recordReader.nextKeyValue()).thenReturn(true);
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+               hadoopInputFormat.fetchNext();
+
+               assertThat(hadoopInputFormat.fetched, is(true));
+               assertThat(hadoopInputFormat.hasNext, is(true));
+       }
+
+       @Test
+       public void testFetchNextRecordReaderThrowsException() throws Exception 
{
+
+               DummyRecordReader recordReader = mock(DummyRecordReader.class);
+               when(recordReader.nextKeyValue()).thenThrow(new 
InterruptedException());
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = 
setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader);
+
+               exception.expect(IOException.class);
+               hadoopInputFormat.fetchNext();
+
+               assertThat(hadoopInputFormat.hasNext, is(true));
+       }
+
+       @Test
+       public void checkTypeInformation() throws Exception {
+
+               HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<>(
+                               new DummyVoidKeyInputFormat<Long>(), 
Void.class, Long.class, Job.getInstance());
+
+               TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+               TypeInformation<Tuple2<Void,Long>> expectedType = new 
TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO);
+
+               assertThat(tupleType.isTupleType(), is(true));
+               assertThat(tupleType, is(equalTo(expectedType)));
+       }
+
+
+       private HadoopInputFormat<String, Long> 
setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
+                                                                               
                                                   RecordReader<String, Long> 
recordReader) {
+
+               HadoopInputFormat<String, Long> hadoopInputFormat = new 
HadoopInputFormat<>(inputFormat,
+                               String.class, Long.class, job);
+               hadoopInputFormat.recordReader = recordReader;
+
+               return hadoopInputFormat;
+       }
+
        public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
 
                public DummyVoidKeyInputFormat() {}
@@ -45,29 +169,61 @@ public class HadoopInputFormatTest {
                        return null;
                }
        }
-       
-       @Test
-       public void checkTypeInformation() {
-               try {
-                       // Set up the Hadoop Input Format
-                       Job job = Job.getInstance();
-                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>(
-                                       new DummyVoidKeyInputFormat<Long>(), 
Void.class, Long.class, job);
-
-                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
-                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
-                       
-                       if(tupleType.isTupleType()) {
-                               
if(!((TupleTypeInfo<?>)tupleType).equals(testTupleType)) {
-                                       fail("Tuple type information was not 
set correctly!");
-                               }
-                       } else {
-                               fail("Type information was not set to tuple 
type information!");
-                       }
+
+       public class DummyRecordReader extends RecordReader<String, Long> {
+
+               @Override
+               public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException 
{
 
                }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+
+               @Override
+               public boolean nextKeyValue() throws IOException, 
InterruptedException {
+                       return false;
+               }
+
+               @Override
+               public String getCurrentKey() throws IOException, 
InterruptedException {
+                       return null;
+               }
+
+               @Override
+               public Long getCurrentValue() throws IOException, 
InterruptedException {
+                       return null;
+               }
+
+               @Override
+               public float getProgress() throws IOException, 
InterruptedException {
+                       return 0;
+               }
+
+               @Override
+               public void close() throws IOException {
+
+               }
+       }
+
+       public class DummyInputFormat extends InputFormat<String, Long> {
+
+               @Override
+               public List<InputSplit> getSplits(JobContext jobContext) throws 
IOException, InterruptedException {
+                       return null;
+               }
+
+               @Override
+               public RecordReader<String, Long> createRecordReader(InputSplit 
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+                       return new DummyRecordReader();
+               }
+       }
+
+       public class ConfigurableDummyInputFormat extends DummyInputFormat 
implements Configurable {
+
+               @Override
+               public void setConf(Configuration configuration) {}
+
+               @Override
+               public Configuration getConf() {
+                       return null;
                }
        }
 }

Reply via email to