http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java
new file mode 100755
index 0000000..271083d
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.etl;
+
+import java.util.Map;
+
+import org.apache.bigtop.bigpetstore.contract.PetStoreStatistics;
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class CrunchETL extends PetStoreStatistics {
+
+    public static MapFn<LineItem, String> COUNT_BY_PRODUCT = new 
MapFn<LineItem, String>() {
+        public String map(LineItem lineItem) {
+            try {
+                return lineItem.getDescription();
+            } catch (Throwable t) {
+                throw new RuntimeException(t);
+            }
+        }
+    };
+    public static MapFn<LineItem, String> COUNT_BY_STATE = new MapFn<LineItem, 
String>() {
+        public String map(LineItem lineItem) {
+            try {
+                return lineItem.getDescription();
+            } catch (Throwable t) {
+                throw new RuntimeException(t);
+            }
+        }
+    };
+
+    PCollection<LineItem> lineItems;
+
+    public CrunchETL(Path input, Path output) throws Exception {
+        Pipeline pipeline = MemPipeline.getInstance();
+        PCollection<String> lines = pipeline.read(From.textFile(new Path(input,
+                "part-r-00000")));
+        System.out.println("crunch : " + lines.getName() + "  "
+                + lines.getSize());
+        lineItems = lines.parallelDo(ETL, Avros.reflects(LineItem.class));
+
+    }
+
+    public static MapFn ETL = new MapFn<String, LineItem>() {
+        @Override
+        public LineItem map(String input) {
+            String[] fields = input.split(",");
+            LineItem li = new LineItem();
+            li.setAppName(fields[1]);
+            li.setFirstName(fields[3]);
+            // ...
+            li.setDescription(fields[fields.length - 1]);
+            return li;
+        }
+    };
+
+    @Override
+    public Map<String, ? extends Number> numberOfTransactionsByState()
+            throws Exception {
+        PTable<String, Long> counts = lineItems.parallelDo(COUNT_BY_STATE,
+                Avros.strings()).count();
+        Map m = counts.materializeToMap();
+
+        System.out.println("Crunch:::  " + m);
+        return m;
+    }
+
+    @Override
+    public Map<String, ? extends Number> numberOfProductsByProduct()
+            throws Exception {
+        PTable<String, Long> counts = lineItems.parallelDo(COUNT_BY_PRODUCT,
+                Avros.strings()).count();
+        Map m = counts.materializeToMap();
+        //CrunchETL. System.out.println("Crunch:::  " + m);
+        return m;
+    }
+
+    public static void main(String... args) throws Exception {
+        /**
+         * PCollection<String> lines = MemPipeline .collectionOf(
+         *  "BigPetStore,storeCode_AK,1  lindsay,franco,Sat Jan 10 00:11:10 
EST 1970,10.5,dog-food"
+         *  "BigPetStore,storeCode_AZ,1  tom,giles,Sun Dec 28 23:08:45 EST 
1969,10.5,dog-food"
+         *  "BigPetStore,storeCode_CA,1  brandon,ewing,Mon Dec 08 20:23:57 EST 
1969,16.5,organic-dog-food"
+         *  "BigPetStore,storeCode_CA,2  angie,coleman,Thu Dec 11 07:00:31 EST 
1969,10.5,dog-food"
+         *  "BigPetStore,storeCode_CA,3  angie,coleman,Tue Jan 20 06:24:23 EST 
1970,7.5,cat-food"
+         *  "BigPetStore,storeCode_CO,1  sharon,trevino,Mon Jan 12 07:52:10 
EST 1970,30.1,antelope snacks"
+         *  "BigPetStore,storeCode_CT,1  kevin,fitzpatrick,Wed Dec 10 05:24:13 
EST 1969,10.5,dog-food"
+         *  "BigPetStore,storeCode_NY,1  dale,holden,Mon Jan 12 23:02:13 EST 
1970,19.75,fish-food"
+         *  "BigPetStore,storeCode_NY,2  dale,holden,Tue Dec 30 12:29:52 EST 
1969,10.5,dog-food"
+         *  "BigPetStore,storeCode_OK,1  donnie,tucker,Sun Jan 18 04:50:26 EST 
1970,7.5,cat-food"
+         * );
+         **/
+        // FAILS
+        Pipeline pipeline = new MRPipeline(CrunchETL.class);
+
+        PCollection<String> lines = pipeline.read(From.textFile(new Path(
+                "/tmp/BigPetStore1388719888255/generated/part-r-00000")));
+
+
+        PCollection<LineItem> lineItems = lines.parallelDo(
+                new MapFn<String, LineItem>() {
+                    @Override
+                    public LineItem map(String input) {
+
+                        System.out.println("proc1 " + input);
+                        String[] fields = input.split(",");
+                        LineItem li = new LineItem();
+                        li.setAppName("" + fields[1]);
+                        li.setFirstName("" + fields[3]);
+                        li.setDescription("" + fields[fields.length - 1]);
+                        return li;
+                    }
+                }, Avros.reflects(LineItem.class));
+
+        for (LineItem i : lineItems.materialize())
+            System.out.println(i);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java
new file mode 100755
index 0000000..a415cf4
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.etl;
+
+import java.io.Serializable;
+
+public class LineItem implements Serializable{
+
+    public LineItem(String appName, String storeCode, Integer lineId, String 
firstName, String lastName, String timestamp, Double price, String description){
+        super();
+        this.appName=appName;
+        this.storeCode=storeCode;
+        this.lineId=lineId;
+        this.firstName=firstName;
+        this.lastName=lastName;
+        this.timestamp=timestamp;
+        this.price=price;
+        this.description=description;
+    }
+
+    String appName;
+    String storeCode;
+    Integer lineId;
+    String firstName;
+    String lastName;
+    String timestamp;
+    Double price;
+    String description;
+
+    public LineItem(){
+        super();
+    }
+
+    public String getAppName(){
+        return appName;
+    }
+
+    public void setAppName(String appName){
+        this.appName=appName;
+    }
+
+    public String getStoreCode(){
+        return storeCode;
+    }
+
+    public void setStoreCode(String storeCode){
+        this.storeCode=storeCode;
+    }
+
+    public int getLineId(){
+        return lineId;
+    }
+
+    public void setLineId(int lineId){
+        this.lineId=lineId;
+    }
+
+    public String getFirstName(){
+        return firstName;
+    }
+
+    public void setFirstName(String firstName){
+        this.firstName=firstName;
+    }
+
+    public String getLastName(){
+        return lastName;
+    }
+
+    public void setLastName(String lastName){
+        this.lastName=lastName;
+    }
+
+    public String getTimestamp(){
+        return timestamp;
+    }
+
+    public void setTimestamp(String timestamp){
+        this.timestamp=timestamp;
+    }
+
+    public double getPrice(){
+        return price;
+    }
+
+    public void setPrice(double price){
+        this.price=price;
+    }
+
+    public String getDescription(){
+        return description;
+    }
+
+    public void setDescription(String description){
+        this.description=description;
+    }
+
+    // other constructors, parsers, etc.
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
new file mode 100644
index 0000000..0ca7444
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.etl;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS;
+import org.apache.bigtop.bigpetstore.util.DeveloperTools;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+
+/**
+ * This class operates by ETL'ing the data-set into pig.
+ * The pigServer is persisted through the life of the class, so that the
+ * intermediate data sets created in the constructor can be reused.
+ */
+public class PigCSVCleaner  {
+
+    PigServer pigServer;
+
+    private static Path getCleanedTsvPath(Path outputPath) {
+      return new Path(outputPath, OUTPUTS.tsv.name());
+    }
+
+    public PigCSVCleaner(Path inputPath, Path outputPath, ExecType ex, File... 
scripts)
+            throws Exception {
+        FileSystem fs = FileSystem.get(inputPath.toUri(), new Configuration());
+
+        if(! fs.exists(inputPath)){
+            throw new RuntimeException("INPUT path DOES NOT exist : " + 
inputPath);
+        }
+
+        if(fs.exists(outputPath)){
+            throw new RuntimeException("OUTPUT already exists : " + 
outputPath);
+        }
+        // run pig in local mode
+        pigServer = new PigServer(ex);
+
+        /**
+         * First, split the tabs up.
+         *
+         * BigPetStore,storeCode_OK,2 1,yang,jay,3,flea collar,69.56,Mon Dec 
15 23:33:49 EST 1969
+         *
+         * ("BigPetStore,storeCode_OK,2", "1,yang,jay,3,flea collar,69.56,Mon 
Dec 15 23:33:49 EST 1969")
+         */
+        pigServer.registerQuery("csvdata = LOAD '<i>' AS 
(ID,DETAILS);".replaceAll("<i>", inputPath.toString()));
+
+        // currentCustomerId, firstName, lastName, product.id, 
product.name.toLowerCase, product.price, date
+        /**
+         * Now, we want to split the two tab delimited fields into uniform
+         * fields of comma separated values. To do this, we 1) Internally split
+         * the FIRST and SECOND fields by commas "a,b,c" --> (a,b,c) 2) FLATTEN
+         * the FIRST and SECOND fields. (d,e) (a,b,c) -> d e a b c
+         */
+        pigServer.registerQuery(
+              "id_details = FOREACH csvdata GENERATE "
+              + "FLATTEN(STRSPLIT(ID, ',', 3)) AS " +
+                       "(drop, code, transaction) ,"
+
+              + "FLATTEN(STRSPLIT(DETAILS, ',', 7)) AS " +
+                  "(custId, fname, lname, productId, product:chararray, price, 
date);");
+        pigServer.registerQuery("mahout_records = FOREACH id_details GENERATE 
custId, productId, 1;");
+        pigServer.store("id_details", 
getCleanedTsvPath(outputPath).toString());
+        pigServer.store("mahout_records", new Path(outputPath, 
OUTPUTS.MahoutPaths.Mahout.name()).toString());
+        /**
+         * Now we run scripts... this is where you can add some
+         * arbitrary analytics.
+         *
+         * We add "input" and "output" parameters so that each
+         * script can read them and use them if they want.
+         *
+         * Otherwise, just hardcode your inputs into your pig scripts.
+         */
+        int i = 0;
+        for(File script : scripts) {
+            Map<String,String> parameters = new HashMap<>();
+            parameters.put("input", getCleanedTsvPath(outputPath).toString());
+
+            Path dir = outputPath.getParent();
+            Path adHocOut = new Path(dir, OUTPUTS.pig_ad_hoc_script.name() + 
(i++));
+            System.out.println("Setting default output to " + adHocOut);
+            parameters.put("output", adHocOut.toString());
+            pigServer.registerScript(script.getAbsolutePath(), parameters);
+        }
+    }
+
+    private static File[] files(String[] args,int startIndex) {
+        List<File> files = new ArrayList<File>();
+        for(int i = startIndex ; i < args.length ; i++) {
+            File f = new File(args[i]);
+            if(! f.exists()) {
+                throw new RuntimeException("Pig script arg " + i + " " + 
f.getAbsolutePath() + " not found. ");
+            }
+            files.add(f);
+        }
+        System.out.println(
+                "Ad-hoc analytics:"+
+                "Added  " + files.size() + " pig scripts to post process.  "+
+                "Each one will be given $input and $output arguments.");
+        return files.toArray(new File[]{});
+    }
+
+    public static void main(final String[] args) throws Exception {
+        System.out.println("Starting pig etl " + args.length);
+        Configuration c = new Configuration();
+        int res = ToolRunner.run(c, new Tool() {
+                    Configuration conf;
+                    @Override
+                    public void setConf(Configuration conf) {
+                        this.conf=conf;
+                    }
+
+                    @Override
+                    public Configuration getConf() {
+                        return this.conf;
+                    }
+
+                    @Override
+                    public int run(String[] args) throws Exception {
+                        DeveloperTools.validate(
+                                args,
+                                "generated data directory",
+                                "pig output directory");
+                        new PigCSVCleaner(
+                                new Path(args[0]),
+                                new Path(args[1]),
+                                ExecType.MAPREDUCE,
+                                files(args,2));
+                        return 0;
+                    }
+                }, args);
+        System.exit(res);
+      }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
new file mode 100755
index 0000000..6c8beef
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants;
+import org.apache.bigtop.bigpetstore.util.DeveloperTools;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static 
org.apache.bigtop.bigpetstore.generator.PetStoreTransactionsInputFormat.props;
+
+/**
+ * This is a mapreduce implementation of a generator of a large sentiment
+ * analysis data set. The scenario is as follows:
+ *
+ * The number of records will (roughly) correspond to the output size - each
+ * record is about 80 bytes.
+ *
+ * 1KB set bigpetstore_records=10 1MB set bigpetstore_records=10,000 1GB set
+ * bigpetstore_records=10,000,000 1TB set bigpetstore_records=10,000,000,000
+ */
+public class BPSGenerator {
+
+  public static final int DEFAULT_NUM_RECORDS = 100;
+
+  final static Logger log = LoggerFactory.getLogger(BPSGenerator.class);
+
+  public enum props {
+    bigpetstore_records
+  }
+
+  public static Job createJob(Path output, int records) throws IOException {
+    Configuration c = new Configuration();
+    c.setInt(props.bigpetstore_records.name(), DEFAULT_NUM_RECORDS);
+    return getCreateTransactionRecordsJob(output, c);
+  }
+
+  public static Job getCreateTransactionRecordsJob(Path outputDir, 
Configuration conf)
+          throws IOException {
+    Job job = new Job(conf, "PetStoreTransaction_ETL_" + 
System.currentTimeMillis());
+    // recursively delete the data set if it exists.
+    FileSystem.get(outputDir.toUri(), conf).delete(outputDir, true);
+    job.setJarByClass(BPSGenerator.class);
+    job.setMapperClass(MyMapper.class);
+    // use the default reducer
+    // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setInputFormatClass(PetStoreTransactionsInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, outputDir);
+    return job;
+  }
+
+  public static class MyMapper extends Mapper<Text, Text, Text, Text> {
+    @Override
+    protected void setup(Context context) throws IOException,
+    InterruptedException {
+      super.setup(context);
+    }
+
+    protected void map(Text key, Text value, Context context)
+            throws java.io.IOException, InterruptedException {
+      context.write(key, value);
+    }
+  }
+
+  public static void main(String args[]) throws Exception {
+    if (args.length != 2) {
+      System.err.println("USAGE : [number of records] [output path]");
+      System.exit(0);
+    } else {
+      Configuration conf = new Configuration();
+      DeveloperTools.validate(args, "# of records", "output path");
+      
conf.setInt(PetStoreTransactionsInputFormat.props.bigpetstore_records.name(),
+              Integer.parseInt(args[0]));
+      getCreateTransactionRecordsJob(new Path(args[1]), 
conf).waitForCompletion(true);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
new file mode 100644
index 0000000..0223c8d
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.bigtop.bigpetstore.generator.util.State
+import org.apache.hadoop.fs.Path
+import parquet.org.codehaus.jackson.format.DataFormatDetector
+import org.slf4j.LoggerFactory
+import java.util.{Collection => JavaCollection}
+import scala.collection.JavaConversions.asJavaCollection
+import java.util.Random
+import scala.collection.mutable.{HashMap, Set, MultiMap}
+import scala.collection.immutable.NumericRange
+
+/**
+ * This class generates random customer data. The generated customer
+ * ids will be consecutive. The client code that generates the transactions
+ * records needs to know the available customer ids. If we keep the customer
+ * ids consecutive here. we don't have to store those ids in memory, or perform
+ * costly lookups. Once we introduce something that allows efficient lookup
+ * of data, we can do something else as well.
+ *
+ * The generated customer ids will start from 1. So, if we have 100 customers,
+ * the ids will be [1, 100].
+ */
+class CustomerGenerator(val desiredCustomerCount: Int, val outputPath: Path) {
+  private val logger = LoggerFactory.getLogger(getClass)
+  private val random = new Random;
+  private val assertion = "The generateCustomerRecords() hasn't been called 
yet";
+  private var customerFileGenerated = false
+  private val _stateToCustomerIds = new HashMap[State, NumericRange[Long]]
+
+  def isCustomerFileGenrated = customerFileGenerated
+
+  def customerIds(state: State) = {
+    assert(customerFileGenerated, assertion)
+    _stateToCustomerIds(state)
+  }
+
+  def generateCustomerRecords() = {
+    val config = new Configuration
+    val fs = FileSystem.getLocal(config)
+
+    assert(!fs.exists(outputPath))
+
+    val outputStream = fs.create(outputPath)
+
+    var currentId: Long = 1
+    logger.info("Generating customer records at: {}", 
fs.pathToFile(outputPath))
+    for (state <- State.values();
+            stateCustomerCount = (state.probability * desiredCustomerCount) 
toLong;
+            random = new Random(state.hashCode);
+            i <- 1L to stateCustomerCount) {
+      val customerRecord = CustomerGenerator.createRecord(currentId, state, 
random);
+      logger.info("generated customer: {}", customerRecord)
+      outputStream.writeBytes(customerRecord)
+
+      if(i == 1) {
+        val stateCustomerIdRange = currentId until (currentId + 
stateCustomerCount);
+        _stateToCustomerIds += (state -> stateCustomerIdRange)
+      }
+      currentId += 1
+    }
+
+    println(_stateToCustomerIds)
+    outputStream.flush
+    outputStream.close
+    customerFileGenerated = true
+  }
+}
+
+object CustomerGenerator {
+  val OUTPUT_FILE_NAME = "customers"
+
+  private def createRecord(id: Long, state: State, r: Random) = {
+    val firstName = DataForger.firstName
+    val lastName = DataForger.lastName
+    
s"$id\t${DataForger.firstName(r)}\t${DataForger.lastName(r)}\t${state.name}\n"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransaction.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransaction.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransaction.java
new file mode 100755
index 0000000..27a3407
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransaction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.util.Date;
+
+public interface PetStoreTransaction {
+
+    public String getFirstName();
+
+    public String getLastName();
+
+    public String getProduct();
+
+    public Date getDate();
+
+    public Integer getPrice();
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
new file mode 100755
index 0000000..d350cc8
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.bigtop.bigpetstore.generator.util.State;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * What does an `InputSplit` actually do? From the Javadocs, it looks like ...
+ * absolutely nothing.
+ *
+ * Note: for some reason, you *have* to implement Writable, even if your 
methods
+ * do nothing, or you will got strange and un-debuggable null pointer
+ * exceptions.
+ */
+public class PetStoreTransactionInputSplit extends InputSplit implements
+        Writable {
+
+    public PetStoreTransactionInputSplit() {
+    }
+
+    public int records;
+    public State state;
+    public Range<Long> customerIdRange;
+
+    public PetStoreTransactionInputSplit(int records, Range<Long> 
customerIdRange, State state) {
+        this.records = records;
+        this.state = state;
+        this.customerIdRange = customerIdRange;
+    }
+
+    public void readFields(DataInput dataInputStream) throws IOException {
+        records = dataInputStream.readInt();
+        state = State.valueOf(dataInputStream.readUTF());
+        customerIdRange = Range.between(dataInputStream.readLong(), 
dataInputStream.readLong());
+    }
+
+    public void write(DataOutput dataOutputStream) throws IOException {
+        dataOutputStream.writeInt(records);
+        dataOutputStream.writeUTF(state.name());
+        dataOutputStream.writeLong(customerIdRange.getMinimum());
+        dataOutputStream.writeLong(customerIdRange.getMaximum());
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+        return new String[] {};
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+        return records;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
new file mode 100755
index 0000000..4c22e36
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import 
org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal;
+import org.apache.bigtop.bigpetstore.generator.util.State;
+import org.apache.commons.lang3.Range;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * A simple input split that fakes input.
+ */
+public class PetStoreTransactionsInputFormat extends
+    FileInputFormat<Text, Text> {
+
+  @Override
+  public RecordReader<Text, Text> createRecordReader(
+          final InputSplit inputSplit, TaskAttemptContext arg1)
+                  throws IOException, InterruptedException {
+    return new RecordReader<Text, Text>() {
+
+      @Override
+      public void close() throws IOException {
+
+      }
+
+      /**
+       * We need the "state" information to generate records. - Each state
+       * has a probability associated with it, so that our data set can be
+       * realistic (i.e. Colorado should have more transactions than rhode
+       * island).
+       *
+       * - Each state also will its name as part of the key.
+       *
+       * - This task would be distributed, for example, into 50 nodes on a
+       * real cluster, each creating the data for a given state.
+       */
+
+      PetStoreTransactionInputSplit bpsInputplit = 
(PetStoreTransactionInputSplit) inputSplit;
+      int records = bpsInputplit.records;
+      // TODO why not send the whole InputSplit there?
+      Iterator<KeyVal<String, String>> data =
+              (new TransactionIteratorFactory(records, 
bpsInputplit.customerIdRange, bpsInputplit.state)).data();
+      KeyVal<String, String> currentRecord;
+
+      @Override
+      public Text getCurrentKey() throws IOException,
+      InterruptedException {
+        return new Text(currentRecord.key());
+      }
+
+      @Override
+      public Text getCurrentValue() throws IOException,
+      InterruptedException {
+        return new Text(currentRecord.value());
+      }
+
+      @Override
+      public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+              throws IOException, InterruptedException {
+      }
+
+      @Override
+      public boolean nextKeyValue() throws IOException,
+      InterruptedException {
+        if (data.hasNext()) {
+          currentRecord = data.next();
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public float getProgress() throws IOException, InterruptedException {
+        return 0f;
+      }
+
+    };
+  }
+
+  public enum props {
+    bigpetstore_records
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext arg) throws IOException {
+    int numRecordsDesired = arg
+            .getConfiguration()
+            .getInt(PetStoreTransactionsInputFormat.props.bigpetstore_records
+                    .name(), -1);
+    if (numRecordsDesired == -1) {
+      throw new RuntimeException(
+              "# of total records not set in configuration object: "
+                      + arg.getConfiguration());
+    }
+
+    List<InputSplit> list = new ArrayList<InputSplit>();
+    long customerIdStart = 1;
+    for (State s : State.values()) {
+      int numRecords = numRecords(numRecordsDesired, s.probability);
+      // each state is assigned a range of customer-ids from which it can 
choose.
+      // The number of customers can be as many as the number of transactions.
+      Range<Long> customerIdRange = Range.between(customerIdStart, 
customerIdStart + numRecords - 1);
+      PetStoreTransactionInputSplit split =
+              new PetStoreTransactionInputSplit(numRecords, customerIdRange, 
s);
+      System.out.println(s + " _ " + split.records);
+      list.add(split);
+      customerIdStart += numRecords;
+    }
+    return list;
+  }
+
+  private int numRecords(int numRecordsDesired, float probability) {
+    return (int) (Math.ceil(numRecordsDesired * probability));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
new file mode 100644
index 0000000..54ae8fe
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/Product.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator.util;
+
+import java.math.BigDecimal;
+import static org.apache.bigtop.bigpetstore.generator.util.ProductType.*;
+
+public enum Product {
+  DOG_FOOD(DOG, 10.50),
+  ORGANIC_DOG_FOOD(DOG, 16.99),
+  STEEL_LEASH(DOG, 19.99),
+  FUZZY_COLLAR(DOG, 24.90),
+  LEATHER_COLLAR(DOG, 18.90),
+  CHOKE_COLLAR(DOG, 15.50),
+  DOG_HOUSE(DOG, 109.99),
+  CHEWY_BONE(DOG, 20.10),
+  DOG_VEST(DOG, 19.99),
+  DOG_SOAP(DOG, 5.45),
+
+  CAT_FOOD(CAT, 7.50),
+  FEEDER_BOWL(CAT, 10.99),
+  LITTER_BOX(CAT, 24.95),
+  CAT_COLLAR(CAT, 7.95),
+  CAT_BLANKET(CAT, 14.49),
+
+  TURTLE_PELLETS(TURTLE, 4.95),
+  TURTLE_FOOD(TURTLE, 10.90),
+  TURTLE_TUB(TURTLE, 40.45),
+
+  FISH_FOOD(FISH, 12.50),
+  SALMON_BAIT(FISH, 29.95),
+  FISH_BOWL(FISH, 20.99),
+  AIR_PUMP(FISH, 13.95),
+  FILTER(FISH, 34.95),
+
+  DUCK_COLLAR(DUCK, 13.25),
+  DUCK_FOOD(DUCK, 20.25),
+  WADING_POOL(DUCK, 45.90);
+
+  /*
+  ANTELOPE_COLLAR(OTHER, 19.90),
+  ANTELOPE_SNACKS(OTHER, 29.25),
+  RODENT_CAGE(OTHER, 39.95),
+  HAY_BALE(OTHER, 4.95),
+  COW_DUNG(OTHER, 1.95),
+  SEAL_SPRAY(OTHER, 24.50),
+  SNAKE_BITE_OINTMENT(OTHER, 29.90);
+  */
+  private final BigDecimal price;
+  public final ProductType productType;
+  private Product(ProductType productType, double price) {
+    this.price = BigDecimal.valueOf(price);
+    this.productType = productType;
+  }
+
+  public int id() {
+    return this.ordinal();
+  }
+
+  public BigDecimal price() {
+    return this.price;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
new file mode 100644
index 0000000..af9ea7f
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/ProductType.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public enum ProductType {
+  DOG, CAT, TURTLE, FISH, DUCK;
+
+  private List<Product> products;
+
+  public List<Product> getProducts() {
+    if(products == null) {
+      generateProductList();
+    }
+    return products;
+  }
+
+  private void generateProductList() {
+    List<Product> products = new ArrayList<>();
+    for(Product p : Product.values()) {
+      if(p.productType == this) {
+        products.add(p);
+      }
+    }
+    this.products = Collections.unmodifiableList(products);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
new file mode 100644
index 0000000..2c729a7
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/generator/util/State.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator.util;
+
+import java.util.Random;
+
+
+/**
+ * Each "state" has a pet store , with a certain "proportion" of the
+ * transactions.
+ */
+public enum State {
+  // Each state is associated with a relative probability.
+  AZ(.1f),
+  AK(.1f),
+  CT(.1f),
+  OK(.1f),
+  CO(.1f),
+  CA(.3f),
+  NY(.2f);
+
+  public static Random rand = new Random();
+  public float probability;
+
+  private State(float probability) {
+    this.probability = probability;
+  }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
new file mode 100644
index 0000000..10acd5a
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/recommend/ItemRecommender.scala
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.recommend
+
+import org.apache.mahout.cf.taste.hadoop.als.RecommenderJob
+import org.apache.mahout.cf.taste.hadoop.als.ParallelALSFactorizationJob
+import java.io.File
+import parquet.org.codehaus.jackson.map.DeserializerFactory.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.conf.Configurable
+import org.apache.hadoop.util.ToolRunner
+import org.apache.mahout.cf.taste.hadoop.als.SharingMapper
+import org.apache.hadoop.util.Tool
+import org.apache.bigtop.bigpetstore.util.DeveloperTools
+
+// We don't need to wrap these two jobs in ToolRunner.run calls since the only
+// thing that we are doing right now is calling the run() methods of 
RecommenderJob
+// and ParallelALSFactorizationJob. Both of these classes have a main() method 
that
+// internally calls ToolRunner.run with all the command line args passed. So, 
if
+// we want to run this code from the command line, we can easily do so by 
running
+// the main methods of the ParallelALSFactorizationJob, followed by running the
+// main method of RecommenderJob. That would also take care of the multiple-jvm
+// instance issue metioned in the comments below, so the call to
+class ItemRecommender(private val inputDir: String,
+        private val factorizationOutputDir: String,
+        private val recommendationsOutputDir: String) {
+  private val recommenderJob = new RecommenderJob
+  private val factorizationJob = new ParallelALSFactorizationJob
+
+  private def tempDir = "/tmp/mahout_" + System.currentTimeMillis
+
+  private def performAlsFactorization() = {
+    ToolRunner.run(factorizationJob, Array(
+        "--input", inputDir,
+        "--output", factorizationOutputDir,
+        "--lambda", "0.1",
+        "--tempDir", tempDir,
+        "--implicitFeedback", "false",
+        "--alpha", "0.8",
+        "--numFeatures", "2",
+        "--numIterations", "5",
+        "--numThreadsPerSolver", "1"))
+  }
+
+  private def generateRecommendations() = {
+    ToolRunner.run(recommenderJob, (Array(
+        "--input", factorizationOutputDir + "/userRatings/",
+        "--userFeatures", factorizationOutputDir + "/U/",
+        "--itemFeatures", factorizationOutputDir + "/M/",
+        "--numRecommendations", "1",
+        "--output", recommendationsOutputDir,
+        "--maxRating", "1")))
+  }
+
+  // At this point, the performAlsFactorization generateRecommendations
+  // and this method can not be run from the same VM instance. These two jobs
+  // share a common static variable which is not being handled correctly.
+  // This, unfortunately, results in a class-cast exception being thrown. 
That's
+  // why the resetFlagInSharedAlsMapper is required. See the comments on
+  // resetFlagInSharedAlsMapper() method.
+  def recommend = {
+    performAlsFactorization
+    resetFlagInSharedAlsMapper
+    generateRecommendations
+  }
+
+  // necessary for local execution in the same JVM only. If the 
performAlsFactorization()
+  // and generateRecommendations() calls are performed in separate JVM 
instances, this
+  // would be taken care of automatically. However, if we want to run this two 
methods
+  // as one task, we need to clean up the static state set by these methods, 
and we don't
+  // have any legitimate way of doing this directly. This clean-up should have 
been
+  // performed by ParallelALSFactorizationJob class after the job is finished.
+  // TODO: remove this when a better way comes along, or 
ParallelALSFactorizationJob
+  // takes responsibility.
+  private def resetFlagInSharedAlsMapper {
+    val m = classOf[SharingMapper[_, _, _, _, _]].getDeclaredMethod("reset");
+    m setAccessible true
+    m.invoke(null)
+  }
+}
+
+object ItemRecommender {
+  def main(args: Array[String]) {
+      val res = ToolRunner.run(new Configuration(), new Tool() {
+      var conf: Configuration = _;
+
+      override def setConf(conf: Configuration) {
+        this.conf=conf;
+      }
+
+
+      override def getConf() = {
+        this.conf;
+      }
+
+
+      override def run(toolArgs: Array[String]) = {
+        val ir = new ItemRecommender(toolArgs(0), toolArgs(1), toolArgs(2))
+        ir.recommend
+        0;
+      }
+    }, args);
+    System.exit(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
new file mode 100755
index 0000000..01a6b95
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/BigPetStoreConstants.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Static final constants
+ *
+ * is useful to have the basic sql here as the HIVE SQL can vary between hive
+ * versions if updated here will update everywhere
+ */
+
+package org.apache.bigtop.bigpetstore.util;
+
+public class BigPetStoreConstants {
+
+   //Files should be stored in graphviz arch.dot
+   public static enum OUTPUTS {
+        generated,//generator
+        cleaned,//pig
+        tsv,
+        pig_ad_hoc_script,
+        CUSTOMER_PAGE; //crunchhh
+
+        public static enum MahoutPaths {
+          Mahout,
+          AlsFactorization,
+          AlsRecommendations
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/DeveloperTools.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/DeveloperTools.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/DeveloperTools.java
new file mode 100755
index 0000000..06671b9
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/DeveloperTools.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.Job;
+
+/**
+ * Dev utilities for testing arguments etc...
+ */
+public class DeveloperTools {
+
+    /**
+     * Validates that the expected args are present in the "args" array.
+     * Just some syntactic sugar for good arg error handling.
+     * @param args
+     * @param expected arguments.
+     */
+    public static void validate(String[] args, String... expected) {
+        int i=-1;
+        try{
+            for(i = 0 ; i < expected.length ; i++) {
+                System.out.println("VALUE OF " + expected[i] + " = " + 
args[i]);
+            }
+        }
+        catch(Throwable t) {
+            System.out.println("Argument " + i + " not available.");
+            System.out.println("We expected " + expected.length + " arguments 
for this phase");
+        }
+
+
+    }
+    public static void main(String[] args) throws Exception {
+        Log LOG = LogFactory.getLog(Job.class);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
new file mode 100644
index 0000000..c652beb
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/NumericalIdUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.util;
+
+import org.apache.bigtop.bigpetstore.generator.util.State;
+
+/**
+ * User and Product IDs need numerical
+ * identifiers for recommender algorithms
+ * which attempt to interpolate new
+ * products.
+ *
+ * TODO: Delete this class. Its not necessarily required: We might just use 
HIVE HASH() as our
+ * standard for this.
+ */
+public class NumericalIdUtils {
+
+    /**
+     * People: Leading with ordinal code for state.
+     */
+    public static long toId(State state, String name){
+        String fromRawData =
+                state==null?
+                        name:
+                         (state.name()+"_"+name);
+        return fromRawData.hashCode();
+    }
+    /**
+     * People: Leading with ordinal code for state.
+     */
+    public static long toId(String name){
+        return toId(null,name);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/PetStoreParseFunctions.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/PetStoreParseFunctions.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/PetStoreParseFunctions.java
new file mode 100755
index 0000000..056dfc3
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/PetStoreParseFunctions.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * TODO: This might be dead code.
+ */
+public class PetStoreParseFunctions {
+
+    String[] headers = { "code", "city", "country", "lat", "lon" };
+
+    public Map<String, Object> parse(String line) {
+
+        Map<String, Object> resultMap = new HashMap<String, Object>();
+
+        List<String> csvObj = null;
+
+        String[] temp = line.split(",");
+        csvObj = new ArrayList<String>(Arrays.asList(temp));
+
+        if (csvObj.isEmpty()) {
+            return resultMap;
+        }
+
+        int k = 0;
+
+        for (String valueStr : csvObj) {
+
+            resultMap.put(headers[k++], valueStr);
+
+        }
+
+        return resultMap;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/StringUtils.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/StringUtils.java
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/StringUtils.java
new file mode 100644
index 0000000..e4e012e
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/java/org/apache/bigtop/bigpetstore/util/StringUtils.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.util;
+
+import java.util.ArrayList;
+
+/**
+********************************************************************
+* Borrowed from apache-commons-lang StringUtils, overtime we might
+* add more elements here .
+* To maintain minor dependencies on a cluster sometimes this is easier
+* jar's manually in the hadoop classpath or via DistributedCache.
+********************************************************************/
+
+public class StringUtils {
+
+     public static String substringBefore(String str, String separator) {
+         int pos = str.indexOf(separator);
+         if (pos == -1) {
+             return str;
+         }
+         return str.substring(0, pos);
+     }
+
+
+     public static String substringAfter(String str, String separator) {
+         if (str.length()==0) {
+             return str;
+         }
+         if (separator == null) {
+             return "";
+         }
+         int pos = str.indexOf(separator);
+         if (pos == -1) {
+             return "";
+         }
+         return str.substring(pos + separator.length());
+     }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
new file mode 100644
index 0000000..c5e6513
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/DataForger.scala
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator
+
+import java.util.Random
+import org.jfairy.Fairy
+import java.util.Date
+
+
+/**
+ * Generic class for generating random data. This class was created so
+ * that we can provide a uniform API for getting random data. If we want,
+ * we can replace the underlying data-generation implementation using
+ * existing libraries.
+ */
+object DataForger {
+  private val random = new Random
+  private val fairy = Fairy.create()
+
+  // TODO: Jay / Bhashit : refactor to use a random data generator?
+  def firstName(random: Random) = firstNames(random.nextInt(firstNames.length))
+  def firstName: String = firstName(random)
+
+  // TODO: Jay / Bhashit : refactor to use a random data generator?
+  def lastName(random: Random) = lastNames(random.nextInt(lastNames.length))
+  def lastName: String = lastName(random)
+
+  def randomDateInPastYears(maxYearsEarlier: Int) = 
fairy.dateProducer().randomDateInThePast(maxYearsEarlier).toDate()
+
+  private val firstNames =  IndexedSeq("Aaron", "Abby", "Abigail", "Adam",
+          "Alan", "Albert", "Alex", "Alexandra", "Alexis", "Alice", "Alicia",
+          "Alisha", "Alissa", "Allen", "Allison", "Alyssa", "Amanda", "Amber",
+          "Amy", "Andrea", "Andrew", "Andy", "Angel", "Angela", "Angie",
+          "Anita", "Ann", "Anna", "Annette", "Anthony", "Antonio", "April",
+          "Arthur", "Ashley", "Audrey", "Austin", "Autumn", "Baby", "Barb",
+          "Barbara", "Becky", "Benjamin", "Beth", "Bethany", "Betty",
+          "Beverly", "Bill", "Billie", "Billy", "Blake", "Bob", "Bobbie",
+          "Bobby", "Bonnie", "Brad", "Bradley", "Brady", "Brandi", "Brandon",
+          "Brandy", "Breanna", "Brenda", "Brent", "Brett", "Brian", "Brianna",
+          "Brittany", "Brooke", "Brooklyn", "Bruce", "Bryan", "Caleb",
+          "Cameron", "Candy", "Carl", "Carla", "Carmen", "Carol", "Carolyn",
+          "Carrie", "Casey", "Cassandra", "Catherine", "Cathy", "Chad",
+          "Charlene", "Charles", "Charlie", "Charlotte", "Chase", "Chasity",
+          "Chastity", "Chelsea", "Cheryl", "Chester", "Cheyenne", "Chris",
+          "Christian", "Christina", "Christine", "Christoph", "Christopher",
+          "Christy", "Chuck", "Cindy", "Clara", "Clarence", "Clayton",
+          "Clifford", "Clint", "Cody", "Colton", "Connie", "Corey", "Cory",
+          "Courtney", "Craig", "Crystal", "Curtis", "Cynthia", "Dakota",
+          "Dale", "Dallas", "Dalton", "Dan", "Dana", "Daniel", "Danielle",
+          "Danny", "Darla", "Darlene", "Darrell", "Darren", "Dave", "David",
+          "Dawn", "Dean", "Deanna", "Debbie", "Deborah", "Debra", "Denise",
+          "Dennis", "Derek", "Derrick", "Destiny", "Devin", "Diana", "Diane",
+          "Dillon", "Dixie", "Dominic", "Don", "Donald", "Donna", "Donnie",
+          "Doris", "Dorothy", "Doug", "Douglas", "Drew", "Duane", "Dustin",
+          "Dusty", "Dylan", "Earl", "Ed", "Eddie", "Edward", "Elaine",
+          "Elizabeth", "Ellen", "Emily", "Eric", "Erica", "Erika", "Erin",
+          "Ernest", "Ethan", "Eugene", "Eva", "Evelyn", "Everett", "Faith",
+          "Father", "Felicia", "Floyd", "Francis", "Frank", "Fred", "Gabriel",
+          "Gage", "Gail", "Gary", "Gene", "George", "Gerald", "Gina", "Ginger",
+          "Glen", "Glenn", "Gloria", "Grace", "Greg", "Gregory", "Haley",
+          "Hannah", "Harley", "Harold", "Harry", "Heath", "Heather", "Heidi",
+          "Helen", "Herbert", "Holly", "Hope", "Howard", "Hunter", "Ian",
+          "Isaac", "Jack", "Jackie", "Jacob", "Jade", "Jake", "James", "Jamie",
+          "Jan", "Jane", "Janet", "Janice", "Jared", "Jasmine", "Jason", "Jay",
+          "Jean", "Jeannie", "Jeff", "Jeffery", "Jeffrey", "Jenna", "Jennifer",
+          "Jenny", "Jeremiah", "Jeremy", "Jerry", "Jesse", "Jessica", "Jessie",
+          "Jill", "Jim", "Jimmy", "Joann", "Joanne", "Jodi", "Jody", "Joe",
+          "Joel", "Joey", "John", "Johnathan", "Johnny", "Jon", "Jonathan",
+          "Jonathon", "Jordan", "Joseph", "Josh", "Joshua", "Joyce", "Juanita",
+          "Judy", "Julia", "Julie", "Justin", "Kaitlyn", "Karen", "Katelyn",
+          "Katherine", "Kathleen", "Kathryn", "Kathy", "Katie", "Katrina",
+          "Kay", "Kayla", "Kaylee", "Keith", "Kelly", "Kelsey", "Ken",
+          "Kendra", "Kenneth", "Kenny", "Kevin", "Kim", "Kimberly", "Kris",
+          "Krista", "Kristen", "Kristin", "Kristina", "Kristy", "Kyle",
+          "Kylie", "Lacey", "Laken", "Lance", "Larry", "Laura", "Lawrence",
+          "Leah", "Lee", "Leonard", "Leroy", "Leslie", "Levi", "Lewis",
+          "Linda", "Lindsay", "Lindsey", "Lisa", "Lloyd", "Logan", "Lois",
+          "Loretta", "Lori", "Louis", "Lynn", "Madison", "Mandy", "Marcus",
+          "Margaret", "Maria", "Mariah", "Marie", "Marilyn", "Marion", "Mark",
+          "Marlene", "Marsha", "Martha", "Martin", "Marty", "Marvin", "Mary",
+          "Mary ann", "Mason", "Matt", "Matthew", "Max", "Megan", "Melanie",
+          "Melinda", "Melissa", "Melody", "Michael", "Michelle", "Mickey",
+          "Mike", "Mindy", "Miranda", "Misty", "Mitchell", "Molly", "Monica",
+          "Morgan", "Mother", "Myron", "Nancy", "Natasha", "Nathan",
+          "Nicholas", "Nick", "Nicole", "Nina", "Noah", "Norma", "Norman",
+          "Olivia", "Paige", "Pam", "Pamela", "Pat", "Patricia", "Patrick",
+          "Patty", "Paul", "Paula", "Peggy", "Penny", "Pete", "Phillip",
+          "Phyllis", "Rachael", "Rachel", "Ralph", "Randall", "Randi", "Randy",
+          "Ray", "Raymond", "Rebecca", "Regina", "Renee", "Rex", "Rhonda",
+          "Richard", "Rick", "Ricky", "Rita", "Rob", "Robbie", "Robert",
+          "Roberta", "Robin", "Rochelle", "Rocky", "Rod", "Rodney", "Roger",
+          "Ron", "Ronald", "Ronda", "Ronnie", "Rose", "Roxanne", "Roy", "Russ",
+          "Russell", "Rusty", "Ruth", "Ryan", "Sabrina", "Sally", "Sam",
+          "Samantha", "Samuel", "Sandra", "Sandy", "Sara", "Sarah", "Savannah",
+          "Scott", "Sean", "Seth", "Shanda", "Shane", "Shanna", "Shannon",
+          "Sharon", "Shaun", "Shawn", "Shawna", "Sheila", "Shelly", "Sher",
+          "Sherri", "Sherry", "Shirley", "Sierra", "Skyler", "Stacey", "Stacy",
+          "Stanley", "Stephanie", "Stephen", "Steve", "Steven", "Sue",
+          "Summer", "Susan", "Sydney", "Tabatha", "Tabitha", "Tamara", "Tammy",
+          "Tara", "Tasha", "Tashia", "Taylor", "Ted", "Teresa", "Terri",
+          "Terry", "Tessa", "Thelma", "Theresa", "Thomas", "Tia", "Tiffany",
+          "Tim", "Timmy", "Timothy", "Tina", "Todd", "Tom", "Tommy", "Toni",
+          "Tony", "Tonya", "Tracey", "Tracie", "Tracy", "Travis", "Trent",
+          "Trevor", "Trey", "Trisha", "Tristan", "Troy", "Tyler", "Tyrone",
+          "Unborn", "Valerie", "Vanessa", "Vernon", "Veronica", "Vicki",
+          "Vickie", "Vicky", "Victor", "Victoria", "Vincent", "Virginia",
+          "Vivian", "Walter", "Wanda", "Wayne", "Wendy", "Wesley", "Whitney",
+          "William", "Willie", "Wyatt", "Zachary")
+
+  private val lastNames = IndexedSeq("Abbott", "Acevedo", "Acosta", "Adams",
+          "Adkins", "Aguilar", "Aguirre", "Albert", "Alexander", "Alford",
+          "Allen", "Allison", "Alston", "Alvarado", "Alvarez", "Anderson",
+          "Andrews", "Anthony", "Armstrong", "Arnold", "Ashley", "Atkins",
+          "Atkinson", "Austin", "Avery", "Avila", "Ayala", "Ayers", "Bailey",
+          "Baird", "Baker", "Baldwin", "Ball", "Ballard", "Banks", "Barber",
+          "Smith", "Johnson", "Williams", "Jones", "Brown", "Davis", "Miller",
+          "Wilson", "Moore", "Taylor", "Thomas", "Jackson", "Barker", "Barlow",
+          "Barnes", "Barnett", "Barr", "Barrera", "Barrett", "Barron", "Barry",
+          "Bartlett", "Barton", "Bass", "Bates", "Battle", "Bauer", "Baxter",
+          "Beach", "Bean", "Beard", "Beasley", "Beck", "Becker", "Bell",
+          "Bender", "Benjamin", "Bennett", "Benson", "Bentley", "Benton",
+          "Berg", "Berger", "Bernard", "Berry", "Best", "Bird", "Bishop",
+          "Black", "Blackburn", "Blackwell", "Blair", "Blake", "Blanchard",
+          "Blankenship", "Blevins", "Bolton", "Bond", "Bonner", "Booker",
+          "Boone", "Booth", "Bowen", "Bowers", "Bowman", "Boyd", "Boyer",
+          "Boyle", "Bradford", "Bradley", "Bradshaw", "Brady", "Branch",
+          "Bray", "Brennan", "Brewer", "Bridges", "Briggs", "Bright", "Britt",
+          "Brock", "Brooks", "Browning", "Bruce", "Bryan", "Bryant",
+          "Buchanan", "Buck", "Buckley", "Buckner", "Bullock", "Burch",
+          "Burgess", "Burke", "Burks", "Burnett", "Burns", "Burris", "Burt",
+          "Burton", "Bush", "Butler", "Byers", "Byrd", "Cabrera", "Cain",
+          "Calderon", "Caldwell", "Calhoun", "Callahan", "Camacho", "Cameron",
+          "Campbell", "Campos", "Cannon", "Cantrell", "Cantu", "Cardenas",
+          "Carey", "Carlson", "Carney", "Carpenter", "Carr", "Carrillo",
+          "Carroll", "Carson", "Carter", "Carver", "Case", "Casey", "Cash",
+          "Castaneda", "Castillo", "Castro", "Cervantes", "Chambers", "Chan",
+          "Chandler", "Chaney", "Chang", "Chapman", "Charles", "Chase",
+          "Chavez", "Chen", "Cherry", "Christensen", "Christian", "Church",
+          "Clark", "Clarke", "Clay", "Clayton", "Clements", "Clemons",
+          "Cleveland", "Cline", "Cobb", "Cochran", "Coffey", "Cohen", "Cole",
+          "Coleman", "Collier", "Collins", "Colon", "Combs", "Compton",
+          "Conley", "Conner", "Conrad", "Contreras", "Conway", "Cook", "Cooke",
+          "Cooley", "Cooper", "Copeland", "Cortez", "Cote", "Cotton", "Cox",
+          "Craft", "Craig", "Crane", "Crawford", "Crosby", "Cross", "Cruz",
+          "Cummings", "Cunningham", "Curry", "Curtis", "Dale", "Dalton",
+          "Daniel", "Daniels", "Daugherty", "Davenport", "David", "Davidson",
+          "Dawson", "Day", "Dean", "Decker", "Dejesus", "Delacruz", "Delaney",
+          "Deleon", "Delgado", "Dennis", "Diaz", "Dickerson", "Dickinson",
+          "Dillard", "Dillon", "Dixon", "Dodson", "Dominguez", "Donaldson",
+          "Donovan", "Dorsey", "Dotson", "Douglas", "Downs", "Doyle", "Drake",
+          "Dudley", "Duffy", "Duke", "Duncan", "Dunlap", "Dunn", "Duran",
+          "Durham", "Dyer", "Eaton", "Edwards", "Elliott", "Ellis", "Ellison",
+          "Emerson", "England", "English", "Erickson", "Espinoza", "Estes",
+          "Estrada", "Evans", "Everett", "Ewing", "Farley", "Farmer",
+          "Farrell", "Faulkner", "Ferguson", "Fernandez", "Ferrell", "Fields",
+          "Figueroa", "Finch", "Finley", "Fischer", "Fisher", "Fitzgerald",
+          "Fitzpatrick", "Fleming", "Fletcher", "Flores", "Flowers", "Floyd",
+          "Flynn", "Foley", "Forbes", "Ford", "Foreman", "Foster", "Fowler",
+          "Fox", "Francis", "Franco", "Frank", "Franklin", "Franks", "Frazier",
+          "Frederick", "Freeman", "French", "Frost", "Fry", "Frye", "Fuentes",
+          "Fuller", "Fulton", "Gaines", "Gallagher", "Gallegos", "Galloway",
+          "Gamble", "Garcia", "Gardner", "Garner", "Garrett", "Garrison",
+          "Garza", "Gates", "Gay", "Gentry", "George", "Gibbs", "Gibson",
+          "Gilbert", "Giles", "Gill", "Gillespie", "Gilliam", "Gilmore",
+          "Glass", "Glenn", "Glover", "Goff", "Golden", "Gomez", "Gonzales",
+          "Gonzalez", "Good", "Goodman", "Goodwin", "Gordon", "Gould",
+          "Graham", "Grant", "Graves", "Gray", "Green", "Greene", "Greer",
+          "Gregory", "Griffin", "Griffith", "Grimes", "Gross", "Guerra",
+          "Guerrero", "Guthrie", "Gutierrez", "Guy", "Guzman", "Hahn", "Hale",
+          "Haley", "Hall", "Hamilton", "Hammond", "Hampton", "Hancock",
+          "Haney", "Hansen", "Hanson", "Hardin", "Harding", "Hardy", "Harmon",
+          "Harper", "Harris", "Harrington", "Harrison", "Hart", "Hartman",
+          "Harvey", "Hatfield", "Hawkins", "Hayden", "Hayes", "Haynes", "Hays",
+          "Head", "Heath", "Hebert", "Henderson", "Hendricks", "Hendrix",
+          "Henry", "Hensley", "Henson", "Herman", "Hernandez", "Herrera",
+          "Herring", "Hess", "Hester", "Hewitt", "Hickman", "Hicks", "Higgins",
+          "Hill", "Hines", "Hinton", "Hobbs", "Hodge", "Hodges", "Hoffman",
+          "Hogan", "Holcomb", "Holden", "Holder", "Holland", "Holloway",
+          "Holman", "Holmes", "Holt", "Hood", "Hooper", "Hoover", "Hopkins",
+          "Hopper", "Horn", "Horne", "Horton", "House", "Houston", "Howard",
+          "Howe", "Howell", "Hubbard", "Huber", "Hudson", "Huff", "Huffman",
+          "Hughes", "Hull", "Humphrey", "Hunt", "Hunter", "Hurley", "Hurst",
+          "Hutchinson", "Hyde", "Ingram", "Irwin", "Jacobs", "Jacobson",
+          "James", "Jarvis", "Jefferson", "Jenkins", "Jennings", "Jensen",
+          "Jimenez", "Johns", "Johnston", "Jordan", "Joseph", "Joyce",
+          "Joyner", "Juarez", "Justice", "Kane", "Kaufman", "Keith", "Keller",
+          "Kelley", "Kelly", "Kemp", "Kennedy", "Kent", "Kerr", "Key", "Kidd",
+          "Kim", "King", "Kinney", "Kirby", "Kirk", "Kirkland", "Klein",
+          "Kline", "Knapp", "Knight", "Knowles", "Knox", "Koch", "Kramer",
+          "Lamb", "Lambert", "Lancaster", "Landry", "Lane", "Lang", "Langley",
+          "Lara", "Larsen", "Larson", "Lawrence", "Lawson", "Le", "Leach",
+          "Leblanc", "Lee", "Leon", "Leonard", "Lester", "Levine", "Levy",
+          "Lewis", "Lindsay", "Lindsey", "Little", "Livingston", "Lloyd",
+          "Logan", "Long", "Lopez", "Lott", "Love", "Lowe", "Lowery", "Lucas",
+          "Luna", "Lynch", "Lynn", "Lyons", "Macdonald", "Macias", "Mack",
+          "Madden", "Maddox", "Maldonado", "Malone", "Mann", "Manning",
+          "Marks", "Marquez", "Marsh", "Marshall", "Martin", "Martinez",
+          "Mason", "Massey", "Mathews", "Mathis", "Matthews", "Maxwell", "May",
+          "Mayer", "Maynard", "Mayo", "Mays", "McBride", "McCall", "McCarthy",
+          "McCarty", "McClain", "McClure", "McConnell", "McCormick", "McCoy",
+          "McCray", "McCullough", "McDaniel", "McDonald", "McDowell",
+          "McFadden", "McFarland", "McGee", "McGowan", "McGuire", "McIntosh",
+          "McIntyre", "McKay", "McKee", "McKenzie", "McKinney", "McKnight",
+          "McLaughlin", "McLean", "McLeod", "McMahon", "McMillan", "McNeil",
+          "McPherson", "Meadows", "Medina", "Mejia", "Melendez", "Melton",
+          "Mendez", "Mendoza", "Mercado", "Mercer", "Merrill", "Merritt",
+          "Meyer", "Meyers", "Michael", "Middleton", "Miles", "Mills",
+          "Miranda", "Mitchell", "Molina", "Monroe", "Montgomery", "Montoya",
+          "Moody", "Moon", "Mooney", "Morales", "Moran", "Moreno", "Morgan",
+          "Morin", "Morris", "Morrison", "Morrow", "Morse", "Morton", "Moses",
+          "Mosley", "Moss", "Mueller", "Mullen", "Mullins", "Munoz", "Murphy",
+          "Murray", "Myers", "Nash", "Navarro", "Neal", "Nelson", "Newman",
+          "Newton", "Nguyen", "Nichols", "Nicholson", "Nielsen", "Nieves",
+          "Nixon", "Noble", "Noel", "Nolan", "Norman", "Norris", "Norton",
+          "Nunez", "Obrien", "Ochoa", "Oconnor", "Odom", "Odonnell", "Oliver",
+          "Olsen", "Olson", "O'neal", "O'neil", "O'neill", "Orr", "Ortega",
+          "Ortiz", "Osborn", "Osborne", "Owen", "Owens", "Pace", "Pacheco",
+          "Padilla", "Page", "Palmer", "Park", "Parker", "Parks", "Parrish",
+          "Parsons", "Pate", "Patel", "Patrick", "Patterson", "Patton", "Paul",
+          "Payne", "Pearson", "Peck", "Pena", "Pennington", "Perez", "Perkins",
+          "Perry", "Peters", "Petersen", "Peterson", "Petty", "Phelps",
+          "Phillips", "Pickett", "Pierce", "Pittman", "Pitts", "Pollard",
+          "Poole", "Pope", "Porter", "Potter", "Potts", "Powell", "Powers",
+          "Pratt", "Preston", "Price", "Prince", "Pruitt", "Puckett", "Pugh",
+          "Quinn", "Ramirez", "Ramos", "Ramsey", "Randall", "Randolph",
+          "Rasmussen", "Ratliff", "Ray", "Raymond", "Reed", "Reese", "Reeves",
+          "Reid", "Reilly", "Reyes", "Reynolds", "Rhodes", "Rice", "Rich",
+          "Richard", "Richards", "Richardson", "Richmond", "Riddle", "Riggs",
+          "Riley", "Rios", "Rivas", "Rivera", "Rivers", "Roach", "Robbins",
+          "Roberson", "Roberts", "Robertson", "Robinson", "Robles", "Rocha",
+          "Rodgers", "Rodriguez", "Rodriquez", "Rogers", "Rojas", "Rollins",
+          "Roman", "Romero", "Rosa", "Rosales", "Rosario", "Rose", "Ross",
+          "Roth", "Rowe", "Rowland", "Roy", "Ruiz", "Rush", "Russell", "Russo",
+          "Rutledge", "Ryan", "Salas", "Salazar", "Salinas", "Sampson",
+          "Sanchez", "Sanders", "Sandoval", "Sanford", "Santana", "Santiago",
+          "Santos", "Sargent", "Saunders", "Savage", "Sawyer", "Schmidt",
+          "Schneider", "Schroeder", "Schultz", "Schwartz", "Scott", "Sears",
+          "Sellers", "Serrano", "Sexton", "Shaffer", "Shannon", "Sharp",
+          "Sharpe", "Shaw", "Shelton", "Shepard", "Shepherd", "Sheppard",
+          "Sherman", "Shields", "Short", "Silva", "Simmons", "Simon",
+          "Simpson", "Sims", "Singleton", "Skinner", "Slater", "Sloan",
+          "Small", "Snider", "Snow", "Snyder", "Solis", "Solomon", "Sosa",
+          "Soto", "Sparks", "Spears", "Spence", "Spencer", "Stafford",
+          "Stanley", "Stanton", "Stark", "Steele", "Stein", "Stephens",
+          "Stephenson", "Stevens", "Stevenson", "Stewart", "Stokes", "Stone",
+          "Stout", "Strickland", "Strong", "Stuart", "Suarez", "Sullivan",
+          "Summers", "Sutton", "Swanson", "Sweeney", "Sweet", "Sykes",
+          "Talley", "Tanner", "Tate", "Terrell", "Terry", "Thompson",
+          "Thornton", "Tillman", "Todd", "Torres", "Townsend", "Tran",
+          "Travis", "Trevino", "Trujillo", "Tucker", "Turner", "Tyler",
+          "Tyson", "Underwood", "Valdez", "Valencia", "Valentine",
+          "Valenzuela", "Vance", "Vang", "Vargas", "Vasquez", "Vaughan",
+          "Vaughn", "Vazquez", "Vega", "Velasquez", "Velazquez", "Velez",
+          "Van halen", "Vincent", "Vinson", "Wade", "Wagner", "Walker", "Wall",
+          "Wallace", "Waller", "Walls", "Walsh", "Walter", "Walters", "Walton",
+          "Ward", "Ware", "Warner", "Warren", "Washington", "Waters",
+          "Watkins", "Watson", "Watts", "Weaver", "Webb", "Weber", "Webster",
+          "Weeks", "Weiss", "Welch", "Wells", "West", "Wheeler", "Whitaker",
+          "White", "Whitehead", "Whitfield", "Whitley", "Whitney", "Wiggins",
+          "Wilcox", "Wilder", "Wiley", "Wilkerson", "Wilkins", "Wilkinson",
+          "William", "Williamson", "Willis", "Winters", "Wise", "Witt", "Wolf",
+          "Wolfe", "Wong", "Wood", "Woodard", "Woods", "Woodward", "Wooten",
+          "Workman", "Wright", "Wyatt", "Wynn", "Yang", "Yates", "York",
+          "Young", "Zamora", "Zimmerman")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
new file mode 100644
index 0000000..534c606
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-mapreduce/src/main/scala/org/apache/bigtop/bigpetstore/generator/TransactionIteratorFactory.scala
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.generator;
+
+import java.util.Date
+import org.apache.bigtop.bigpetstore.generator.util.State
+import org.apache.commons.lang3.StringUtils
+import java.util.Arrays.asList
+import java.util.Random
+import scala.collection.Iterator
+import com.sun.org.apache.xml.internal.serializer.ToStream
+import java.util.{Iterator => JavaIterator}
+import scala.collection.JavaConversions.asJavaIterator
+import org.apache.bigtop.bigpetstore.generator.util.Product
+import org.apache.commons.lang3.Range;
+import org.apache.bigtop.bigpetstore.generator.util.ProductType
+
+/**
+ * This class generates our data. Over time we will use it to embed bias which
+ * can then be teased out, i.e. by clustering/classifiers. For example:
+ *
+ * certain products <--> certain years or days
+ */
+class TransactionIteratorFactory(private val records: Int,
+        private val customerIdRange: Range[java.lang.Long],
+        private val state: State) {
+  assert(records > 0, "Number of records must be greater than 0 to generate a 
data iterator!")
+  private val random = new Random(state.hashCode)
+
+  def data: JavaIterator[TransactionIteratorFactory.KeyVal[String, String]] = {
+    new TransactionIteratorFactory.DataIterator(records, customerIdRange, 
state, random)
+  }
+}
+
+object TransactionIteratorFactory {
+  class KeyVal[K, V](val key: K, val value: V)
+
+  private class DataIterator(records: Int,
+          customerIdRange: Range[java.lang.Long],
+          state: State,
+          r: Random) extends Iterator[KeyVal[String, String]] {
+    private var firstName: String = null
+    private var lastName: String = null
+    private var elementsProcducedCount = 0
+    private var repeatCount = 0
+    private var currentCustomerId = customerIdRange.getMinimum
+    private var currentProductType = selectRandomProductType;
+
+    def hasNext =
+      elementsProcducedCount < records && currentCustomerId <= 
customerIdRange.getMaximum
+
+
+    def next(): TransactionIteratorFactory.KeyVal[String,String] = {
+      val date = DataForger.randomDateInPastYears(50);
+      setIteratorState();
+
+      val product = randomProductOfCurrentlySelectedType
+      val key = StringUtils.join(asList("BigPetStore", "storeCode_" + 
state.name(),
+              elementsProcducedCount.toString), ",")
+      val value = StringUtils.join(asList(currentCustomerId, firstName, 
lastName, product.id,
+              product.name.toLowerCase, product.price, date), ",")
+
+      elementsProcducedCount += 1
+      new TransactionIteratorFactory.KeyVal(key, value)
+    }
+
+    private def setIteratorState() = {
+      /** Some customers come back for more :) We repeat a customer up to ten 
times */
+      if (repeatCount > 0) {
+        repeatCount -= 1
+      } else {
+        firstName = DataForger.firstName(r)
+        lastName = DataForger.lastName(r)
+        // this sometimes generates numbers much larger than 10. We don't 
really need Gaussian
+        // distribution since number of transactions per customer can be truly 
arbitrary.
+        repeatCount = (r.nextGaussian * 4f) toInt;
+        println("####Repeat: " + repeatCount)
+        currentCustomerId += 1
+        currentProductType = selectRandomProductType;
+      }
+    }
+
+    private def selectRandomProductType = {
+      ProductType.values.apply(r.nextInt(ProductType.values.length))
+    }
+
+    private def randomProductOfCurrentlySelectedType = {
+      
currentProductType.getProducts.get(r.nextInt(currentProductType.getProducts.size))
+    }
+  }
+}
\ No newline at end of file

Reply via email to