adding Alfonso Nishikawa reviews and test cases
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/28e10111 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/28e10111 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/28e10111 Branch: refs/heads/master Commit: 28e1011120468f183d7f1fb6371285946420a34b Parents: 7063770 Author: Kevin <djkevi...@yahoo.com> Authored: Mon Sep 19 12:07:02 2016 +0530 Committer: Kevin <djkevi...@yahoo.com> Committed: Mon Sep 19 12:07:02 2016 +0530 ---------------------------------------------------------------------- .../mapreduce/MapReduceSerialization.java | 51 ++++++++++---------- .../gora/mapreduce/MapReduceTestUtils.java | 44 +++++++++++++++++ 2 files changed, 69 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/28e10111/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java ---------------------------------------------------------------------- diff --git a/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java b/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java index fd5c062..a8dcf9e 100644 --- a/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java +++ b/gora-core/src/examples/java/org/apache/gora/examples/mapreduce/MapReduceSerialization.java @@ -43,7 +43,6 @@ public class MapReduceSerialization extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(MapReduceSerialization.class); public MapReduceSerialization() { - } public MapReduceSerialization(Configuration conf) { @@ -56,13 +55,12 @@ public class MapReduceSerialization extends Configured implements Tool { */ public static class CheckDirtyBitsSerializationMapper extends GoraMapper<String, WebPage, Text, WebPage> { - @Override protected void map(String key, WebPage page, Context context) - throws IOException ,InterruptedException { - page.setUrl("hola") ; - context.write(new Text(key), page) ; - }; + throws IOException, InterruptedException { + page.setUrl("hola"); + context.write(new Text(key), page); + } } /** @@ -71,29 +69,29 @@ public class MapReduceSerialization extends Configured implements Tool { */ public static class CheckDirtyBytesSerializationReducer extends GoraReducer<Text, WebPage, String, WebPage> { - @Override protected void reduce(Text key, Iterable<WebPage> values, Context context) - throws IOException ,InterruptedException { + throws IOException, InterruptedException { for (WebPage val : values) { - LOG.info(key.toString()) ; - LOG.info(val.toString()) ; - LOG.info(String.valueOf(val.isDirty())) ; + LOG.info(key.toString()); + LOG.info(val.toString()); + LOG.info(String.valueOf(val.isDirty())); context.write(key.toString(), val); } - }; - + } } /** * Creates and returns the {@link Job} for submitting to Hadoop mapreduce. - * @param inStore - * @param query - * @return + * + * @param inStore input store on MR jobs runs on + * @param query query to select input set run MR + * @param outStore output store which stores results of MR jobs + * @return job MR job definition * @throws IOException */ - public Job createJob(DataStore<String,WebPage> inStore, Query<String,WebPage> query - , DataStore<String,WebPage> outStore) throws IOException { + public Job createJob(DataStore<String, WebPage> inStore, Query<String, WebPage> query + , DataStore<String, WebPage> outStore) throws IOException { Job job = new Job(getConf()); job.setJobName("Check serialization of dirty bits"); @@ -118,10 +116,11 @@ public class MapReduceSerialization extends Configured implements Tool { return job; } - public int mapReduceSerialization(DataStore<String,WebPage> inStore, - DataStore<String, WebPage> outStore) throws IOException, InterruptedException, ClassNotFoundException { - Query<String,WebPage> query = inStore.newQuery(); - query.setFields("url") ; + public int mapReduceSerialization(DataStore<String, WebPage> inStore, + DataStore<String, WebPage> outStore) + throws IOException, InterruptedException, ClassNotFoundException { + Query<String, WebPage> query = inStore.newQuery(); + query.setFields("url"); Job job = createJob(inStore, query, outStore); return job.waitForCompletion(true) ? 0 : 1; @@ -130,14 +129,14 @@ public class MapReduceSerialization extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - DataStore<String,WebPage> inStore; - DataStore<String,WebPage> outStore; + DataStore<String, WebPage> inStore; + DataStore<String, WebPage> outStore; Configuration conf = new Configuration(); - if(args.length > 0) { + if (args.length > 0) { String dataStoreClass = args[0]; inStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class, conf); - if(args.length > 1) { + if (args.length > 1) { dataStoreClass = args[1]; } outStore = DataStoreFactory.getDataStore(dataStoreClass, http://git-wip-us.apache.org/repos/asf/gora/blob/28e10111/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java ---------------------------------------------------------------------- diff --git a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java index f7a44f0..91ae1bc 100644 --- a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java +++ b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java @@ -18,6 +18,9 @@ package org.apache.gora.mapreduce; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -26,10 +29,12 @@ import org.apache.avro.Schema.Field; import org.apache.gora.examples.WebPageDataCreator; import org.apache.gora.examples.generated.TokenDatum; import org.apache.gora.examples.generated.WebPage; +import org.apache.gora.examples.mapreduce.MapReduceSerialization; import org.apache.gora.examples.mapreduce.QueryCounter; import org.apache.gora.examples.mapreduce.WordCount; import org.apache.gora.examples.spark.SparkWordCount; import org.apache.gora.query.Query; +import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; import org.apache.gora.store.impl.DataStoreBase; import org.apache.hadoop.conf.Configuration; @@ -141,4 +146,43 @@ public class MapReduceTestUtils { assertNotNull("token:" + token + " cannot be found in datastore", datum); assertEquals("count for token:" + token + " is wrong", count, datum.getCount().intValue()); } + + public static void testMapReduceSerialization(Configuration conf, DataStore<String, WebPage> inStore, DataStore<String, + WebPage> outStore) throws Exception { + //Datastore now has to be a Hadoop based datastore + ((DataStoreBase<String, WebPage>) inStore).setConf(conf); + ((DataStoreBase<String, WebPage>) outStore).setConf(conf); + + //create input + WebPage page = WebPage.newBuilder().build(); + page.setUrl("TestURL"); + List<CharSequence> content = new ArrayList<CharSequence>(); + content.add("parsed1"); + content.add("parsed2"); + page.setParsedContent(content); + page.setContent(ByteBuffer.wrap("content".getBytes(Charset.defaultCharset()))); + inStore.put("key1", page); + inStore.flush(); + + // expected + WebPage expectedPage = WebPage.newBuilder().build(); + expectedPage.setUrl("hola"); + List<CharSequence> expectedContent = new ArrayList<CharSequence>(); + expectedContent.add("parsed1"); + expectedContent.add("parsed2"); + expectedPage.setParsedContent(expectedContent); + expectedPage.setContent(ByteBuffer.wrap("content".getBytes(Charset.defaultCharset()))); + + //run the job + MapReduceSerialization mapReduceSerialization = new MapReduceSerialization(conf); + mapReduceSerialization.mapReduceSerialization(inStore, outStore); + + Query<String, WebPage> outputQuery = outStore.newQuery(); + Result<String, WebPage> serializationResult = outStore.execute(outputQuery); + + while (serializationResult.next()) { + assertEquals(expectedPage, serializationResult.get()); + } + } + }