[FLINK-2444] Add tests for HadoopInputFormats This closes #1628
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6c3778f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6c3778f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6c3778f Branch: refs/heads/master Commit: a6c3778fd892398d2a6b9c4a02eee1613eda3f42 Parents: bcb34dd Author: Martin Liesenberg <martin.liesenb...@gmail.com> Authored: Fri Feb 12 00:18:06 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Mar 23 15:31:54 2016 +0100 ---------------------------------------------------------------------- .../hadoop/mapred/HadoopInputFormatBase.java | 4 +- .../hadoop/mapreduce/HadoopInputFormat.java | 10 +- .../hadoop/mapreduce/HadoopInputFormatBase.java | 12 +- .../hadoop/mapred/HadoopInputFormatTest.java | 275 +++++++++++++++++-- .../hadoop/mapreduce/HadoopInputFormatTest.java | 202 ++++++++++++-- 5 files changed, 443 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java index d80c80a..ef9999f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java @@ -148,7 +148,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo throws IOException { org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; - for(int i=0;i<splitArray.length;i++){ + for (int i=0; i<splitArray.length; i++) { hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); } return hiSplit; @@ -177,7 +177,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo @Override public boolean reachedEnd() throws IOException { - if(!fetched) { + if (!fetched) { fetchNext(); } return !hasNext; http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java index dadb954..abbcd25 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java @@ -18,15 +18,15 @@ package org.apache.flink.api.java.hadoop.mapreduce; -import java.io.IOException; - import org.apache.flink.annotation.Public; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.hadoop.mapreduce.Job; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; /** * InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats with Flink. @@ -49,10 +49,10 @@ public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2< @Override public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { - if(!this.fetched) { + if (!this.fetched) { fetchNext(); } - if(!this.hasNext) { + if (!this.hasNext) { return null; } try { http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java index 4829ce7..73b11eb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java @@ -111,7 +111,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo @Override public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { // only gather base statistics for FileInputFormats - if(!(mapreduceInputFormat instanceof FileInputFormat)) { + if (!(mapreduceInputFormat instanceof FileInputFormat)) { return null; } @@ -158,7 +158,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo jobContext.getCredentials().addAll(this.credentials); Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser()); - if(currentUserCreds != null) { + if (currentUserCreds != null) { jobContext.getCredentials().addAll(currentUserCreds); } @@ -170,7 +170,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo } HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; - for(int i = 0; i < hadoopInputSplits.length; i++){ + for (int i = 0; i < hadoopInputSplits.length; i++) { hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); } return hadoopInputSplits; @@ -208,7 +208,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo @Override public boolean reachedEnd() throws IOException { - if(!this.fetched) { + if (!this.fetched) { fetchNext(); } return !this.hasNext; @@ -243,7 +243,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo long latestModTime = 0L; // get the file info and check whether the cached statistics are still valid. - for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + for (org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { final Path filePath = new Path(hadoopPath.toUri()); final FileSystem fs = FileSystem.get(filePath.toUri()); @@ -308,7 +308,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCo org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); configuration.readFields(in); - if(this.configuration == null) { + if (this.configuration == null) { this.configuration = configuration; } http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java index 63f8df4..3b8d227 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java @@ -20,21 +20,168 @@ package org.apache.flink.api.java.hadoop.mapred; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.FileInputFormat; import org.junit.Test; import java.io.IOException; -import static org.junit.Assert.fail; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class HadoopInputFormatTest { + @Test + public void testConfigureWithConfigurableInstance() { + ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); + verify(inputFormat, times(1)).setConf(any(JobConf.class)); + + hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration()); + verify(inputFormat, times(2)).setConf(any(JobConf.class)); + } + + @Test + public void testConfigureWithJobConfigurableInstance() { + JobConfigurableDummyInputFormat inputFormat = mock(JobConfigurableDummyInputFormat.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); + verify(inputFormat, times(1)).configure(any(JobConf.class)); + + hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration()); + verify(inputFormat, times(2)).configure(any(JobConf.class)); + } + + @Test + public void testOpenClose() throws Exception { + DummyRecordReader recordReader = mock(DummyRecordReader.class); + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader); + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); + hadoopInputFormat.open(getHadoopInputSplit()); + + verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class)); + verify(recordReader, times(1)).createKey(); + verify(recordReader, times(1)).createValue(); + + assertThat(hadoopInputFormat.fetched, is(false)); + + hadoopInputFormat.close(); + verify(recordReader, times(1)).close(); + } + + @Test + public void testOpenWithConfigurableReader() throws Exception { + ConfigurableDummyRecordReader recordReader = mock(ConfigurableDummyRecordReader.class); + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader); + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); + hadoopInputFormat.open(getHadoopInputSplit()); + + verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class)); + verify(recordReader, times(1)).setConf(any(JobConf.class)); + verify(recordReader, times(1)).createKey(); + verify(recordReader, times(1)).createValue(); + + assertThat(hadoopInputFormat.fetched, is(false)); + + } + + @Test + public void testCreateInputSplits() throws Exception { + + FileSplit[] result = new FileSplit[1]; + result[0] = getFileSplit(); + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + when(inputFormat.getSplits(any(JobConf.class), anyInt())).thenReturn(result); + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); + hadoopInputFormat.createInputSplits(2); + + verify(inputFormat, times(1)).getSplits(any(JobConf.class), anyInt()); + } + + @Test + public void testReachedEndWithElementsRemaining() throws IOException { + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new JobConf()); + hadoopInputFormat.fetched = true; + hadoopInputFormat.hasNext = true; + + assertThat(hadoopInputFormat.reachedEnd(), is(false)); + } + + @Test + public void testReachedEndWithNoElementsRemaining() throws IOException { + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(new DummyInputFormat(), String.class, Long.class, new JobConf()); + hadoopInputFormat.fetched = true; + hadoopInputFormat.hasNext = false; + + assertThat(hadoopInputFormat.reachedEnd(), is(true)); + } + + @Test + public void testFetchNext() throws IOException { + DummyRecordReader recordReader = mock(DummyRecordReader.class); + when(recordReader.next(anyString(), anyLong())).thenReturn(true); + + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader); + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf()); + hadoopInputFormat.open(getHadoopInputSplit()); + hadoopInputFormat.fetchNext(); + + verify(recordReader, times(1)).next(anyString(), anyLong()); + assertThat(hadoopInputFormat.hasNext, is(true)); + assertThat(hadoopInputFormat.fetched, is(true)); + } + + @Test + public void checkTypeInformation() throws Exception { + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>( + new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf()); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + assertThat(tupleType.isTupleType(), is(true)); + assertThat(tupleType, is(equalTo(expectedType))); + } + + private HadoopInputSplit getHadoopInputSplit() { + return new HadoopInputSplit(1, getFileSplit(), new JobConf()); + } + + private FileSplit getFileSplit() { + return new FileSplit(new Path("path"), 1, 2, new String[]{}); + } + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { public DummyVoidKeyInputFormat() {} @@ -44,27 +191,107 @@ public class HadoopInputFormatTest { return null; } } - - @Test - public void checkTypeInformation() { - try { - HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( - new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, new JobConf()); - - TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); - TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); - - if(tupleType.isTupleType()) { - if(!((TupleTypeInfo<?>)tupleType).equals(testTupleType)) { - fail("Tuple type information was not set correctly!"); - } - } else { - fail("Type information was not set to tuple type information!"); - } - - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + + public class DummyRecordReader implements RecordReader<String, Long> { + + @Override + public float getProgress() throws IOException { + return 0; + } + + @Override + public boolean next(String s, Long aLong) throws IOException { + return false; } + + @Override + public String createKey() { + return null; + } + + @Override + public Long createValue() { + return null; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + } + + public class ConfigurableDummyRecordReader implements RecordReader<String, Long>, Configurable { + + @Override + public void setConf(Configuration configuration) {} + + @Override + public Configuration getConf() { + return null; + } + + @Override + public boolean next(String s, Long aLong) throws IOException { + return false; + } + + @Override + public String createKey() { + return null; + } + + @Override + public Long createValue() { + return null; + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + + } + + @Override + public float getProgress() throws IOException { + return 0; + } + } + + public class DummyInputFormat implements InputFormat<String, Long> { + + @Override + public InputSplit[] getSplits(JobConf jobConf, int i) throws IOException { + return new InputSplit[0]; + } + + @Override + public RecordReader<String, Long> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + return null; + } + } + + public class ConfigurableDummyInputFormat extends DummyInputFormat implements Configurable { + @Override + public void setConf(Configuration configuration) {} + + @Override + public Configuration getConf() { + return null; + } + } + + public class JobConfigurableDummyInputFormat extends DummyInputFormat implements JobConfigurable { + + @Override + public void configure(JobConf jobConf) {} } } http://git-wip-us.apache.org/repos/asf/flink/blob/a6c3778f/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java index 919a286..d6ec484 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java @@ -20,22 +20,146 @@ package org.apache.flink.api.java.hadoop.mapreduce; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.List; -import static org.junit.Assert.fail; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class HadoopInputFormatTest { + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void testConfigure() throws Exception { + + ConfigurableDummyInputFormat inputFormat = mock(ConfigurableDummyInputFormat.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null); + hadoopInputFormat.configure(new org.apache.flink.configuration.Configuration()); + + verify(inputFormat, times(1)).setConf(any(Configuration.class)); + } + + @Test + public void testCreateInputSplits() throws Exception { + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null); + hadoopInputFormat.createInputSplits(2); + + verify(inputFormat, times(1)).getSplits(any(JobContext.class)); + } + + @Test + public void testOpen() throws Exception { + DummyInputFormat inputFormat = mock(DummyInputFormat.class); + when(inputFormat.createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class))).thenReturn(new DummyRecordReader()); + HadoopInputSplit inputSplit = mock(HadoopInputSplit.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(inputFormat, Job.getInstance(), null); + hadoopInputFormat.open(inputSplit); + + verify(inputFormat, times(1)).createRecordReader(any(InputSplit.class), any(TaskAttemptContext.class)); + assertThat(hadoopInputFormat.fetched, is(false)); + } + + @Test + public void testClose() throws Exception { + + DummyRecordReader recordReader = mock(DummyRecordReader.class); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + hadoopInputFormat.close(); + + verify(recordReader, times(1)).close(); + } + + @Test + public void testFetchNextInitialState() throws Exception { + DummyRecordReader recordReader = new DummyRecordReader(); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + hadoopInputFormat.fetchNext(); + + assertThat(hadoopInputFormat.fetched, is(true)); + assertThat(hadoopInputFormat.hasNext, is(false)); + } + + @Test + public void testFetchNextRecordReaderHasNewValue() throws Exception { + + DummyRecordReader recordReader = mock(DummyRecordReader.class); + when(recordReader.nextKeyValue()).thenReturn(true); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + hadoopInputFormat.fetchNext(); + + assertThat(hadoopInputFormat.fetched, is(true)); + assertThat(hadoopInputFormat.hasNext, is(true)); + } + + @Test + public void testFetchNextRecordReaderThrowsException() throws Exception { + + DummyRecordReader recordReader = mock(DummyRecordReader.class); + when(recordReader.nextKeyValue()).thenThrow(new InterruptedException()); + + HadoopInputFormat<String, Long> hadoopInputFormat = setupHadoopInputFormat(new DummyInputFormat(), Job.getInstance(), recordReader); + + exception.expect(IOException.class); + hadoopInputFormat.fetchNext(); + + assertThat(hadoopInputFormat.hasNext, is(true)); + } + + @Test + public void checkTypeInformation() throws Exception { + + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<>( + new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, Job.getInstance()); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> expectedType = new TupleTypeInfo<>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + assertThat(tupleType.isTupleType(), is(true)); + assertThat(tupleType, is(equalTo(expectedType))); + } + + + private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job, + RecordReader<String, Long> recordReader) { + + HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, + String.class, Long.class, job); + hadoopInputFormat.recordReader = recordReader; + + return hadoopInputFormat; + } + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { public DummyVoidKeyInputFormat() {} @@ -45,29 +169,61 @@ public class HadoopInputFormatTest { return null; } } - - @Test - public void checkTypeInformation() { - try { - // Set up the Hadoop Input Format - Job job = Job.getInstance(); - HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( - new DummyVoidKeyInputFormat<Long>(), Void.class, Long.class, job); - - TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); - TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); - - if(tupleType.isTupleType()) { - if(!((TupleTypeInfo<?>)tupleType).equals(testTupleType)) { - fail("Tuple type information was not set correctly!"); - } - } else { - fail("Type information was not set to tuple type information!"); - } + + public class DummyRecordReader extends RecordReader<String, Long> { + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + return false; + } + + @Override + public String getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override + public Long getCurrentValue() throws IOException, InterruptedException { + return null; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + + } + } + + public class DummyInputFormat extends InputFormat<String, Long> { + + @Override + public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException { + return null; + } + + @Override + public RecordReader<String, Long> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return new DummyRecordReader(); + } + } + + public class ConfigurableDummyInputFormat extends DummyInputFormat implements Configurable { + + @Override + public void setConf(Configuration configuration) {} + + @Override + public Configuration getConf() { + return null; } } }