This is an automated email from the ASF dual-hosted git repository.

iwasakims pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bigtop.git


The following commit(s) were added to refs/heads/master by this push:
     new ec44825fd  BIGTOP-4424. Rewrite BPS Spark with the DataFrame API and 
spark.ml. (#1350)
ec44825fd is described below

commit ec44825fd08bc26abd93474b107a3082dfb15c05
Author: Kengo Seki <[email protected]>
AuthorDate: Sat May 31 23:26:09 2025 +0900

     BIGTOP-4424. Rewrite BPS Spark with the DataFrame API and spark.ml. (#1350)
---
 bigtop-bigpetstore/bigpetstore-spark/README.md     |  13 +-
 bigtop-bigpetstore/bigpetstore-spark/build.gradle  |   4 +-
 .../spark/analytics/PetStoreStatistics.scala       | 160 ++++++----------
 .../spark/analytics/RecommendProducts.scala        | 110 +++++------
 .../bigpetstore/spark/datamodel/DataModel.scala    |  38 +---
 .../bigpetstore/spark/datamodel/IOUtils.scala      | 107 +++++++----
 .../org/apache/bigpetstore/spark/etl/ETL.scala     | 212 ++++++---------------
 .../bigpetstore/spark/generator/SparkDriver.scala  | 149 ++++++---------
 .../bigpetstore/spark/TestFullPipeline.scala       |  49 +++--
 .../bigpetstore/spark/datamodel/IOUtilsSuite.scala |  12 +-
 .../apache/bigpetstore/spark/etl/ETLSuite.scala    | 124 ++++--------
 .../spark/generator/SparkDriverSuite.scala         |  18 +-
 .../bigpetstore-transaction-queue/build.gradle     |   2 +-
 .../bigtop/bigpetstore/qstream/FileLoadGen.java    |   4 +-
 .../bigtop/bigpetstore/qstream/HttpLoadGen.java    |   2 +-
 .../apache/bigtop/bigpetstore/qstream/LoadGen.java |  26 +--
 .../apache/bigtop/bigpetstore/qstream/Utils.java   |   4 +-
 .../datagenerators/bigpetstore/DataLoader.java     |   2 +-
 .../bigpetstore/datamodels/inputs/InputData.java   |  15 ++
 bigtop-data-generators/build.gradle                |   9 +
 20 files changed, 439 insertions(+), 621 deletions(-)

diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md 
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index 93897cca5..e550e48f3 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -18,9 +18,9 @@ BigPetStore -- Spark
 ====================
 
 BigPetStore is a family of example applications for the Hadoop and Spark
-ecosystems.  BigPetStore is build around a fictional chain pet stores,
+ecosystems. BigPetStore is build around a fictional chain pet stores,
 providing generators for synthetic transaction data and pipelines for
-processing that data.  Each ecosystems has its own version of the
+processing that data. Each ecosystem has its own version of the
 application.
 
 The Spark application currently builds against Spark 3.5.4.
@@ -63,9 +63,8 @@ internal structured data model is defined as input for the 
analytics components:
 * Product(productId: Long, category: String, attributes: Map[String, String])
 * Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: 
java.util.Calendar, productId: Long)
 
-The ETL stage parses and cleans up the dirty CSV and writes out RDDs for each 
data type in the data model, serialized using
-the `saveAsObjectFile()` method. The analytics components can use the 
`IOUtils.load()` method to de-serialize the structured
-data.
+The ETL stage parses and cleans up the dirty CSV and writes out DataFrames for 
each data type in the data model as
+Parquet files. The analytics components can use the `IOUtils.load()` method to 
de-serialize the structured data.
 
 Running Tests
 -------------
@@ -106,7 +105,7 @@ The ETL component:
 * Reads the raw data
 * Parses the data times and products
 * Normalizes the data
-* Writes out RDDs for each type of class (Store, Customer, Location, Product, 
Transaction) in the data model
+* Writes out DataFrames for each type of class (Store, Customer, Location, 
Product, Transaction) in the data model
 
 After building the jar (see above), you can run the ETL component like so:
 
@@ -118,7 +117,7 @@ Running the SparkSQL component
 -------------------------------
 
 Once ETL'd we can now process the data and do analytics on it. The 
DataModel.scala class itself is used to read/write classes
-from files.  To run the analytics job, which outputs a JSON file at the end, 
you now will run the following:
+from files. To run the analytics job, which outputs a JSON file at the end, 
you now will run the following:
 
 ```
 spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics 
bigpetstore-spark-X.jar transformed_data PetStoreStats.json
diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle 
b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
index 34174a992..970065b48 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle
+++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
@@ -91,14 +91,12 @@ dependencies {
     compile("org.apache.spark:spark-core_${scalaVersion}:${sparkVersion}")
     compile("org.apache.spark:spark-mllib_${scalaVersion}:${sparkVersion}")
     compile("org.apache.spark:spark-sql_${scalaVersion}:${sparkVersion}")
-    compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2.1"
-    compile "joda-time:joda-time:2.13.1"
+    compile "org.apache.bigtop:bigpetstore-data-generator:3.5.0-SNAPSHOT"
     compile "org.json4s:json4s-jackson_${scalaVersion}:3.6.12"
 
     testCompile "junit:junit:4.13.2"
     testCompile "org.scalatest:scalatest_${scalaVersion}:3.2.19"
     testCompile "org.scalatestplus:junit-4-13_${scalaVersion}:3.2.19.0"
-    testCompile "joda-time:joda-time:2.13.1"
 }
 
 eclipse {
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
index 98bbcb42e..508dc3cf1 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
@@ -19,157 +19,119 @@ package org.apache.bigtop.bigpetstore.spark.analytics
 
 import java.io.File
 
-import scala.language.postfixOps
-
 import org.apache.spark.sql._
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.rdd._
 
 import org.apache.bigtop.bigpetstore.spark.datamodel._
 
 object PetStoreStatistics {
 
-    private def printUsage(): Unit = {
-      val usage: String = "BigPetStore Analytics Module." +
+  private def printUsage(): Unit = {
+    val usage: String = "BigPetStore Analytics Module." +
       "\n" +
       "Usage: spark-submit ... inputDir outputFile\n " +
       "inputDir - (string) Path to ETL'd data\n" +
       "outputFile - (string) is a JSON file.  For schema, see the code.\n"
 
-      System.err.println(usage)
-    }
+    System.err.println(usage)
+  }
 
   /**
    * Scala details. Some or None are an idiomatic way, in scala, to
    * return an optional value.  This allows us to signify, to the caller, that 
the
    * method may fail.  The caller can decide how to deal with failure (i.e. 
using getOrElse).
+   *
    * @param args
    * @return
    */
-    def parseArgs(args: Array[String]):(Option[String],Option[String]) = {
-      if(args.length < 1) {
-        (None, None)
-      } else if (args.length == 1) {
-        (Some(args(0)), None)
-      } else {
-        (Some(args(0)), Some(args(1)))
-      }
+  def parseArgs(args: Array[String]): (Option[String], Option[String]) = {
+    if (args.length < 1) {
+      (None, None)
+    } else if (args.length == 1) {
+      (Some(args(0)), None)
+    } else {
+      (Some(args(0)), Some(args(1)))
     }
-
-  def productMap(r:Array[Product]) : Map[Long,Product] = {
-    r map (prod => prod.productId -> prod) toMap
   }
 
-  def queryTxByMonth(sqlContext: SQLContext): Array[StatisticsTxByMonth] = {
-    import sqlContext._
-
-    val results: DataFrame = sql("SELECT count(*), month FROM Transactions 
GROUP BY month")
-    val transactionsByMonth = results.collect()
-    for(x<-transactionsByMonth){
-      println(x)
-    }
-
-    transactionsByMonth.map { r =>
-      StatisticsTxByMonth(r.getInt(1), r.getLong(0))
-    }
+  def queryTxByMonth(spark: SparkSession): Array[StatisticsTxByMonth] = {
+    import spark.implicits._
+    val results = spark.sql("SELECT month(dateTime) month, count(*) count FROM 
Transactions GROUP BY month")
+    results.show()
+    results.as[StatisticsTxByMonth].collect()
   }
 
-  def queryTxByProductZip(sqlContext: SQLContext): 
Array[StatisticsTxByProductZip] = {
-    import sqlContext._
-
-    val results: DataFrame = sql(
-      """SELECT count(*) c, productId, zipcode
-FROM Transactions t
-JOIN Stores s ON t.storeId = s.storeId
-GROUP BY productId, zipcode""")
-
-    val groupedProductZips = results.collect()
-
-    //get list of all transactionsData
-    for(x<-groupedProductZips){
-      println("grouped product:zip " + x)
-    }
-
-    //Map JDBC Row into a Serializable case class.
-    groupedProductZips.map { r =>
-      StatisticsTxByProductZip(r.getLong(1),r.getString(2),r.getLong(0))
-    }
+  def queryTxByProductZip(spark: SparkSession): 
Array[StatisticsTxByProductZip] = {
+    import spark.implicits._
+    val results: DataFrame = spark.sql(
+      """SELECT productId, zipcode, count(*) count
+        |FROM Transactions t
+        |JOIN Stores s
+        |ON t.storeId = s.storeId
+        |GROUP BY productId, zipcode""".stripMargin)
+    results.show()
+    results.as[StatisticsTxByProductZip].collect()
   }
 
-  def queryTxByProduct(sqlContext: SQLContext): Array[StatisticsTxByProduct] = 
{
-    import sqlContext._
-
-    val results: DataFrame = sql(
-      """SELECT count(*) c, productId FROM Transactions GROUP BY productId""")
-
-    val groupedProducts = results.collect()
-
-    //Map JDBC Row into a Serializable case class.
-    groupedProducts.map { r =>
-      StatisticsTxByProduct(r.getLong(1),r.getLong(0))
-    }
+  def queryTxByProduct(spark: SparkSession): Array[StatisticsTxByProduct] = {
+    import spark.implicits._
+    val results = spark.sql("SELECT productId, count(*) count FROM 
Transactions GROUP BY productId")
+    results.show()
+    results.as[StatisticsTxByProduct].collect()
   }
 
 
-  def runQueries(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product],
-    RDD[Transaction]), sc: SparkContext): Statistics = {
-
-    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+  def runQueries(spark: SparkSession, locationDF: DataFrame, storeDF: 
DataFrame,
+                 customerDF: DataFrame, productDF: DataFrame, transactionDF: 
DataFrame): Statistics = {
+    import spark.implicits._
 
-    // Transform the Non-SparkSQL Calendar into a SparkSQL-friendly field.
-    val mappableTransactions:RDD[TransactionSQL] =
-      r._5.map { trans => trans.toSQL() }
+    locationDF.createOrReplaceTempView("Locations")
+    storeDF.createOrReplaceTempView("Stores")
+    customerDF.createOrReplaceTempView("Customers")
+    productDF.createOrReplaceTempView("Product")
+    transactionDF.createOrReplaceTempView("Transactions")
 
-    spark.createDataFrame(r._1).toDF().createOrReplaceTempView("Locations")
-    spark.createDataFrame(r._2).createOrReplaceTempView("Stores")
-    spark.createDataFrame(r._3).createOrReplaceTempView("Customers")
-    spark.createDataFrame(r._4).createOrReplaceTempView("Product")
-    
spark.createDataFrame(mappableTransactions).createOrReplaceTempView("Transactions")
-
-
-    val txByMonth = queryTxByMonth(spark.sqlContext)
-    val txByProduct = queryTxByProduct(spark.sqlContext)
-    val txByProductZip = queryTxByProductZip(spark.sqlContext)
+    val txByMonth = queryTxByMonth(spark)
+    val txByProduct = queryTxByProduct(spark)
+    val txByProductZip = queryTxByProductZip(spark)
 
     Statistics(
-      txByMonth.map { s => s.count }.reduce(_+_),  // Total number of 
transactions
+      txByMonth.map(_.count).sum, // Total number of transactions
       txByMonth,
       txByProduct,
       txByProductZip,
-      r._4.collect()) // Product details
+      productDF.as[Product].collect()) // Product details
   }
 
-    /**
-    * We keep a "run" method which can be called easily from tests and also is 
used by main.
-    */
-    def run(txInputDir:String, statsOutputFile:String,
-      sc:SparkContext): Unit = {
+  /**
+   * We keep a "run" method which can be called easily from tests and also is 
used by main.
+   */
+  def run(txInputDir: String, statsOutputFile: String, spark: SparkSession): 
Unit = {
 
-      System.out.println("Running w/ input = " + txInputDir)
+    System.out.println("Running w/ input = " + txInputDir)
 
-      System.out.println("input : " + txInputDir)
-      val etlData = IOUtils.load(sc, txInputDir)
+    System.out.println("input : " + txInputDir)
+    val (locationDF, storeDF, customerDF, productDF, transactionDF) = 
IOUtils.load(spark, txInputDir)
 
-      val stats = runQueries(etlData, sc)
+    val stats = runQueries(spark, locationDF, storeDF, customerDF, productDF, 
transactionDF)
 
-      IOUtils.saveLocalAsJSON(new File(statsOutputFile), stats)
+    IOUtils.saveLocalAsJSON(new File(statsOutputFile), stats)
 
-      System.out.println("Output JSON Stats stored : " + statsOutputFile)
-    }
+    System.out.println("Output JSON Stats stored : " + statsOutputFile)
+  }
 
   def main(args: Array[String]): Unit = {
     // Get or else : On failure (else) we exit.
-    val (inputPath,outputPath) = parseArgs(args)
+    val (inputPath, outputPath) = parseArgs(args)
 
-    if(! (inputPath.isDefined && outputPath.isDefined)) {
+    if (!(inputPath.isDefined && outputPath.isDefined)) {
       printUsage()
       System.exit(1)
     }
 
-    val sc = new SparkContext(new SparkConf().setAppName("PetStoreStatistics"))
+    val spark = 
SparkSession.builder.appName("PetStoreStatistics").getOrCreate()
 
-    run(inputPath.get, outputPath.get, sc)
+    run(inputPath.get, outputPath.get, spark)
 
-    sc.stop()
+    spark.stop()
   }
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
index fb9b8a7a1..57a6bd0ff 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
@@ -18,29 +18,28 @@
 package org.apache.bigtop.bigpetstore.spark.analytics
 
 import org.apache.bigtop.bigpetstore.spark.datamodel._
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.rdd._
-import org.apache.spark.mllib.recommendation._
+import org.apache.spark.ml.recommendation.{ALS, ALSModel}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import java.io.File
 
-case class PRParameters(inputDir: String, outputFile: String)
-
 object RecommendProducts {
 
+  case class PRParameters(inputDir: String, outputFile: String)
+
   private def printUsage(): Unit = {
     val usage = "BigPetStore Product Recommendation Module\n" +
-    "\n" +
-    "Usage: transformed_data recommendations\n" +
-    "\n" +
-    "transformed_data - (string) directory of ETL'd data\n" +
-    "recommendations - (string) output file of recommendations\n"
+      "\n" +
+      "Usage: transformed_data recommendations\n" +
+      "\n" +
+      "transformed_data - (string) directory of ETL'd data\n" +
+      "recommendations - (string) output file of recommendations\n"
 
     println(usage)
   }
 
   def parseArgsOrDie(args: Array[String]): PRParameters = {
-    if(args.length != 2) {
+    if (args.length != 2) {
       printUsage();
       System.exit(1)
     }
@@ -48,57 +47,54 @@ object RecommendProducts {
     PRParameters(args(0), args(1))
   }
 
-  def prepareRatings(tx: RDD[Transaction]): RDD[Rating] = {
-    val productPairs = tx.map { t => ((t.customerId, t.productId), 1) }
-    val pairCounts = productPairs.reduceByKey { (v1, v2) => v1 + v2 }
-    val ratings = pairCounts.map { p => Rating(p._1._1.toInt, p._1._2.toInt, 
p._2) }
-
-    ratings
-  }
-
-  def trainModel(ratings: RDD[Rating], nIterations: Int, rank: Int, alpha: 
Double,
-    lambda: Double): MatrixFactorizationModel = {
-    ratings.cache()
-    val model = ALS.trainImplicit(ratings, nIterations, rank, lambda, alpha)
-
-    model
+  def prepareRatings(spark: SparkSession, tx: DataFrame): DataFrame = {
+    tx.createOrReplaceTempView("Transactions")
+    spark.sql("SELECT customerId, productId, count(*) rating FROM Transactions 
GROUP BY customerId, productId")
   }
 
-  def recommendProducts(customers: RDD[Customer],
-    model: MatrixFactorizationModel, sc: SparkContext, nRecommendations: Int):
-      Array[UserProductRecommendations] = {
-
-    customers.collect().map { c =>
-      val ratings = model.recommendProducts(c.customerId.toInt, 
nRecommendations)
-
-      val productIds = ratings.map { r => r.product.toLong}
-
-      UserProductRecommendations(c.customerId, productIds.toArray)
-    }
+  def trainModel(ratings: DataFrame, nIterations: Int, rank: Int, alpha: 
Double, lambda: Double) =
+    new ALS().setImplicitPrefs(true)
+      .setAlpha(alpha)
+      .setMaxIter(nIterations)
+      .setRank(rank)
+      .setRegParam(lambda)
+      .setUserCol("customerId")
+      .setItemCol("productId")
+      .setRatingCol("rating")
+      .fit(ratings.cache())
+
+  def recommendProducts(spark: SparkSession, model: ALSModel, 
nRecommendations: Int) = {
+    
model.recommendForAllUsers(nRecommendations).createOrReplaceTempView("Recommendations")
+    import spark.implicits._
+    spark.sql(
+      """SELECT customerId, collect_list(productId) productIds
+        |FROM (
+        |  SELECT customerId, productId, rating
+        |  FROM (SELECT customerId, inline(recommendations) FROM 
Recommendations)
+        |  DISTRIBUTE BY customerId SORT BY rating DESC
+        |)
+        |GROUP BY customerId
+        |""".stripMargin).as[UserProductRecommendations].collect()
   }
 
   /**
-    * We keep a "run" method which can be called easily from tests and also is 
used by main.
-    */
-  def run(txInputDir: String, recOutputFile: String, sc: SparkContext,
-  nIterations: Int = 20, alpha: Double = 40.0, rank:Int = 10, lambda: Double = 
1.0,
-  nRecommendations: Int = 5): Unit = {
+   * We keep a "run" method which can be called easily from tests and also is 
used by main.
+   */
+  def run(txInputDir: String, recOutputFile: String, spark: SparkSession,
+          nIterations: Int = 20, alpha: Double = 40.0, rank: Int = 10, lambda: 
Double = 1.0,
+          nRecommendations: Int = 5): Unit = {
 
     println("input : " + txInputDir)
-    println(sc)
-    val rdds = IOUtils.load(sc, txInputDir)
-    val tx = rdds._5
-    val products = rdds._4
-    val customers = rdds._3
-    System.out.println("Transaction count = " + tx.count())
-
-    val ratings = prepareRatings(tx)
+    println(spark)
+    val (_, _, customerDF, productDF, transactionDF) = IOUtils.load(spark, 
txInputDir)
+    System.out.println("Transaction count = " + transactionDF.count())
+
+    val ratings = prepareRatings(spark, transactionDF)
     val model = trainModel(ratings, nIterations, rank, alpha, lambda)
-    val userProdRec = recommendProducts(customers, model, sc, nRecommendations)
+    val userProdRec = recommendProducts(spark, model, nRecommendations)
 
-    val prodRec = ProductRecommendations(customers.collect(),
-      products.collect(),
-      userProdRec)
+    import spark.implicits._
+    val prodRec = ProductRecommendations(customerDF.as[Customer].collect(), 
productDF.as[Product].collect(), userProdRec)
 
     IOUtils.saveLocalAsJSON(new File(recOutputFile), prodRec)
   }
@@ -106,12 +102,10 @@ object RecommendProducts {
   def main(args: Array[String]): Unit = {
     val params: PRParameters = parseArgsOrDie(args)
 
-    val conf = new SparkConf().setAppName("BPS Product Recommendations")
-    val sc = new SparkContext(conf)
+    val spark = SparkSession.builder.appName("BPS Product 
Recommendations").getOrCreate()
 
-    run(params.inputDir, params.outputFile, sc)
+    run(params.inputDir, params.outputFile, spark)
 
-    sc.stop()
+    spark.stop()
   }
-
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index 71edd7f5b..545c7f9bf 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -18,17 +18,13 @@
 package org.apache.bigtop.bigpetstore.spark.datamodel
 
 import java.sql.Timestamp
-import java.util.Calendar
-
-import org.joda.time.DateTime
 
 /**
  * Statistics phase.  Represents JSON for a front end.
  */
-
 case class StatisticsTxByMonth(month: Int, count: Long)
 
-case class StatisticsTxByProductZip(productId:Long, zipcode:String, count:Long)
+case class StatisticsTxByProductZip(productId: Long, zipcode: String, count: 
Long)
 
 case class StatisticsTxByProduct(count: Long, productId: Long)
 
@@ -36,10 +32,14 @@ case class Statistics(totalTransactions: Long,
   transactionsByMonth: Array[StatisticsTxByMonth],
   transactionsByProduct: Array[StatisticsTxByProduct],
   transactionsByProductZip: Array[StatisticsTxByProductZip],
-  productDetails:Array[Product])
+  productDetails: Array[Product])
 
-case class Customer(customerId: Long, firstName: String,
-  lastName: String, zipcode: String)
+case class RawData(storeId: Long, storeZipcode: String, storeCity: String, 
storeState: String,
+  customerId: Long, firstName: String, lastName: String,
+  customerZipcode: String, customerCity: String, customerState: String,
+  txId: Long, txDate: Timestamp, txProduct: String)
+
+case class Customer(customerId: Long, firstName: String, lastName: String, 
zipcode: String)
 
 case class Location(zipcode: String, city: String, state: String)
 
@@ -47,27 +47,7 @@ case class Product(productId: Long, category: String, 
attributes: Map[String, St
 
 case class Store(storeId: Long, zipcode: String)
 
-case class Transaction(customerId: Long, transactionId: Long, storeId: Long, 
dateTime: Calendar, productId: Long){
-
-  /**
-   * Convert to TransactionSQL.
-   * There possibly could be a conversion.
-   */
-  def toSQL(): TransactionSQL = {
-    val dt = new DateTime(dateTime)
-    TransactionSQL(customerId,transactionId,storeId,
-      new Timestamp(
-        new DateTime(dateTime).getMillis),
-        productId,
-        
dt.getYearOfEra,dt.getMonthOfYear,dt.getDayOfMonth,dt.getHourOfDay,dt.getMinuteOfHour)
-  }
-}
-
-/**
- * A Transaction which we can create from the natively stored transactions.
- */
-case class TransactionSQL(customerId: Long, transactionId: Long, storeId: 
Long, timestamp:Timestamp, productId: Long,
-                          year:Int, month:Int, day:Int, hour:Int, minute:Int )
+case class Transaction(customerId: Long, transactionId: Long, storeId: Long, 
dateTime: Timestamp, productId: Long)
 
 case class UserProductRecommendations(customerId: Long, productIds: 
Array[Long])
 
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
index d28756dc6..f866665e7 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -20,17 +20,16 @@ package org.apache.bigtop.bigpetstore.spark.datamodel
 import java.io.File
 import java.nio.file.Files
 import java.nio.charset.StandardCharsets
-
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd._
-
+import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
 import org.json4s.jackson.Serialization
 import org.json4s._
 import org.json4s.jackson.Serialization.{read, write}
 
 /**
-  * Utility functions for loading and saving data model RDDs.
-  */
+ * Utility functions for loading and saving data model RDDs.
+ */
 object IOUtils {
   private val LOCATION_DIR = "locations"
   private val STORE_DIR = "stores"
@@ -38,21 +37,19 @@ object IOUtils {
   private val PRODUCT_DIR = "products"
   private val TRANSACTION_DIR = "transactions"
 
-  private val ANALYTICS_STATS_DIR = "analytics_stats"
-
   /**
-    * Save RDDs of the data model as Sequence files.
-    *
-    * @param outputDir Output directory
-    * @param locationRDD RDD of Location objects
-    * @param storeRDD RDD of Store objects
-    * @param customerRDD RDD of Customer objects
-    * @param productRDD RDD of Product objects
-    * @param transactionRDD RDD of Transaction objects
-    */
+   * Save RDDs of the data model as Sequence files.
+   *
+   * @param outputDir      Output directory
+   * @param locationRDD    RDD of Location objects
+   * @param storeRDD       RDD of Store objects
+   * @param customerRDD    RDD of Customer objects
+   * @param productRDD     RDD of Product objects
+   * @param transactionRDD RDD of Transaction objects
+   */
   def save(outputDir: String, locationRDD: RDD[Location],
-    storeRDD: RDD[Store], customerRDD: RDD[Customer],
-    productRDD: RDD[Product], transactionRDD: RDD[Transaction]): Unit = {
+           storeRDD: RDD[Store], customerRDD: RDD[Customer],
+           productRDD: RDD[Product], transactionRDD: RDD[Transaction]): Unit = 
{
 
     locationRDD.saveAsObjectFile(outputDir + "/" + LOCATION_DIR)
     storeRDD.saveAsObjectFile(outputDir + "/" + STORE_DIR)
@@ -61,45 +58,57 @@ object IOUtils {
     transactionRDD.saveAsObjectFile(outputDir + "/" + TRANSACTION_DIR)
   }
 
+  /**
+   * Save DataFrames of the data model as Parquet files.
+   *
+   * @param outputDir     Output directory
+   * @param locationDF    DataFrame of Location objects
+   * @param storeDF       DataFrame of Store objects
+   * @param customerDF    DataFrame of Customer objects
+   * @param productDF     DataFrame of Product objects
+   * @param transactionDF DataFrame of Transaction objects
+   */
+  def save(outputDir: String, locationDF: DataFrame,
+           storeDF: DataFrame, customerDF: DataFrame,
+           productDF: DataFrame, transactionDF: DataFrame): Unit = {
+
+    locationDF.write.parquet(outputDir + "/" + LOCATION_DIR)
+    storeDF.write.parquet(outputDir + "/" + STORE_DIR)
+    customerDF.write.parquet(outputDir + "/" + CUSTOMER_DIR)
+    productDF.write.parquet(outputDir + "/" + PRODUCT_DIR)
+    transactionDF.write.parquet(outputDir + "/" + TRANSACTION_DIR)
+  }
+
   def saveLocalAsJSON(outputDir: File, statistics: Statistics): Unit = {
     //load the write/read methods.
     implicit val formats = Serialization.formats(NoTypeHints)
-    val json:String = write(statistics)
+    val json: String = write(statistics)
     Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8))
   }
 
-  def readLocalAsStatistics(jsonFile: File):Statistics = {
+  def readLocalAsStatistics(jsonFile: File): Statistics = {
     //load the write/read methods.
     implicit val formats = Serialization.formats(NoTypeHints)
     //Read file as String, and serialize it into Stats object.
     //See http://json4s.org/ examples.
-    
read[Statistics](scala.io.Source.fromFile(jsonFile).getLines().reduceLeft(_+_))
+    
read[Statistics](scala.io.Source.fromFile(jsonFile).getLines().reduceLeft(_ + 
_))
   }
 
-  def saveLocalAsJSON(outputDir: File, 
recommendations:ProductRecommendations): Unit = {
+  def saveLocalAsJSON(outputDir: File, recommendations: 
ProductRecommendations): Unit = {
     //load the write/read methods.
     implicit val formats = Serialization.formats(NoTypeHints)
-    val json:String = write(recommendations)
+    val json: String = write(recommendations)
     Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8))
   }
 
-  def readLocalAsProductRecommendations(jsonFile: File):ProductRecommendations 
= {
-    //load the write/read methods.
-    implicit val formats = Serialization.formats(NoTypeHints)
-    //Read file as String, and serialize it into Stats object.
-    //See http://json4s.org/ examples.
-    
read[ProductRecommendations](scala.io.Source.fromFile(jsonFile).getLines().reduceLeft(_+_))
-  }
-
-
   /**
-    * Load RDDs of the data model from Sequence files.
-    *
-    * @param sc SparkContext
-    * @param inputDir Directory containing Sequence files
-    *
-    * TODO Should take path, not string, this makes input validation complex.
-    */
+   * Load RDDs of the data model from Sequence files.
+   *
+   * @param sc       SparkContext
+   * @param inputDir Directory containing Sequence files
+   *
+   *                 TODO Should take path, not string, this makes input 
validation complex.
+   */
   def load(sc: SparkContext, inputDir: String): (RDD[Location], RDD[Store],
     RDD[Customer], RDD[Product], RDD[Transaction]) = {
 
@@ -121,4 +130,26 @@ object IOUtils {
     (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
   }
 
+  /**
+   * Load DataFrames of the data model from Parquet files.
+   *
+   * @param spark    SparkSession
+   * @param inputDir Directory containing Parquet files
+   *
+   *                 TODO Should take path, not string, this makes input 
validation complex.
+   */
+  def load(spark: SparkSession, inputDir: String) = {
+
+    val locationDF = 
spark.read.schema(Encoders.product[Location].schema).parquet(inputDir + "/" + 
LOCATION_DIR)
+
+    val storeDF = 
spark.read.schema(Encoders.product[Store].schema).parquet(inputDir + "/" + 
STORE_DIR)
+
+    val customerDF = 
spark.read.schema(Encoders.product[Customer].schema).parquet(inputDir + "/" + 
CUSTOMER_DIR)
+
+    val productDF = 
spark.read.schema(Encoders.product[Product].schema).parquet(inputDir + "/" + 
PRODUCT_DIR)
+
+    val transactionDF = 
spark.read.schema(Encoders.product[Transaction].schema).parquet(inputDir + "/" 
+ TRANSACTION_DIR)
+
+    (locationDF, storeDF, customerDF, productDF, transactionDF)
+  }
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
index 3f046a66d..d4d30bb42 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
@@ -17,16 +17,8 @@
 
 package org.apache.bigtop.bigpetstore.spark.etl
 
-import org.apache.bigtop.bigpetstore.spark.datamodel._
-
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.rdd._
-
-import java.text.SimpleDateFormat
-import java.util._
-
-case class TransactionProduct(customerId: Long, transactionId: Long,
-  storeId: Long, dateTime: Calendar, product: String)
+import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, RawData}
+import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
 
 case class ETLParameters(inputDir: String, outputDir: String)
 
@@ -46,7 +38,7 @@ object SparkETL {
   }
 
   def parseArgs(args: Array[String]): ETLParameters = {
-    if(args.length != NPARAMS) {
+    if (args.length != NPARAMS) {
       printUsage()
       System.exit(1)
     }
@@ -54,162 +46,70 @@ object SparkETL {
     ETLParameters(args(0), args(1))
   }
 
-  def readRawData(sc: SparkContext, inputDir: String): RDD[String] = {
-    val rawRecords = sc.textFile(inputDir + "/transactions")
-      .flatMap(_.split("\n"))
-
-    rawRecords
-  }
-
-  def parseRawData(rawRecords: RDD[String]):
-      RDD[(Store, Location, Customer, Location, TransactionProduct)] = {
-    val splitRecords = rawRecords.map { r =>
-      val cols = r.split(",")
-
-      val storeId = cols(0).toInt
-      val storeZipcode = cols(1)
-      val storeCity = cols(2)
-      val storeState = cols(3)
-
-      val storeLocation = Location(storeZipcode, storeCity, storeState)
-      val store = Store(storeId, storeZipcode)
-
-      val customerId = cols(4).toInt
-      val firstName = cols(5)
-      val lastName = cols(6)
-      val customerZipcode = cols(7)
-      val customerCity = cols(8)
-      val customerState = cols(9)
-
-      val customerLocation = Location(customerZipcode, customerCity,
-        customerState)
-      val customer = Customer(customerId, firstName, lastName,
-        customerZipcode)
-
-      val txId = cols(10).toInt
-      val df = new SimpleDateFormat("EEE MMM dd kk:mm:ss z yyyy", Locale.US)
-      val txDate = df.parse(cols(11))
-      val txCal = 
Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US)
-      txCal.setTime(txDate)
-      txCal.set(Calendar.MILLISECOND, 0)
-      val txProduct = cols(12)
-
-      val transaction = TransactionProduct(customerId, txId,
-        storeId, txCal, txProduct)
-
-      (store, storeLocation, customer, customerLocation, transaction)
-    }
-
-    splitRecords
-  }
-
-  def normalizeData(rawRecords: RDD[(Store, Location, Customer,
-    Location, TransactionProduct)]): (RDD[Location], RDD[Store],
-      RDD[Customer], RDD[Product], RDD[Transaction]) = {
-    // extract stores
-    val storeRDD = rawRecords.map {
-      case (store, _, _, _, _) =>
-        store
-    }.distinct()
-
-    // extract store locations
-    val storeLocationRDD = rawRecords.map {
-      case (_, location, _, _, _) =>
-        location
-    }.distinct()
-
-    // extract customers
-    val customerRDD = rawRecords.map {
-      case (_, _, customer, _, _) =>
-        customer
-    }.distinct()
-
-    // extract customer locations
-    val customerLocationRDD = rawRecords.map {
-      case (_, _, _, location, _) =>
-        location
-    }.distinct()
-
-    // extract and normalize products
-    val productStringRDD = rawRecords.map {
-      case (_, _, _, _, tx) =>
-        tx.product
-    }
-    .distinct()
-    .zipWithUniqueId()
-
-    val productRDD = productStringRDD.map {
-      case (productString, id) =>
-        // products are key-value pairs of the form:
-        // key=value;key=value;
-        val prodKV = productString
-          .split(";")
-          .filter(_.trim().length > 0)
-          .map { pair =>
-            val pairString = pair.split("=")
-            (pairString(0), pairString(1))
-           }
-          .toMap
-
-        Product(id, prodKV("category"), prodKV)
-    }
-
-    // extract transactions, map products to productIds
-    val productTransactionRDD = rawRecords.map {
-      case (_, _, _, _, tx) =>
-       (tx.product, tx)
-    }
-
-    val joinedRDD: RDD[(String, (TransactionProduct, Long))]
-      = productTransactionRDD.join(productStringRDD)
-
-    val transactionRDD = joinedRDD.map {
-      case (productString, (tx, productId)) =>
-        Transaction(tx.customerId, tx.transactionId,
-          tx.storeId, tx.dateTime, productId)
-    }
-
-    val locationRDD = storeLocationRDD.
-      union(customerLocationRDD).
-      distinct()
-
-    (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
+  def readRawData(spark: SparkSession, inputDir: String) =
+    spark.read.option("timestampFormat", "EEE MMM dd kk:mm:ss z 
yyyy").schema(Encoders.product[RawData].schema).csv(inputDir + "/transactions")
+
+  def normalizeData(spark: SparkSession, rawDF: DataFrame) = {
+    rawDF.createOrReplaceTempView("RawData")
+
+    val storeDF = spark.sql("SELECT DISTINCT storeId, storeZipcode zipcode 
FROM RawData")
+
+    val locationDF = spark.sql(
+      """SELECT DISTINCT(*) FROM (
+        |  SELECT storeZipcode zipcode, storeCity city, storeState state FROM 
RawData
+        |  UNION
+        |  SELECT customerZipcode zipcode, customerCity city, customerState 
state FROM RawData
+        |)
+        |""".stripMargin)
+
+    val customerDF = spark.sql("SELECT DISTINCT customerId, firstName, 
lastName, customerZipcode zipcode FROM RawData")
+
+    val indexedRawProductDF = spark.sql(
+      """SELECT cast(ROW_NUMBER() OVER (ORDER BY product) as BIGINT) 
productId, product
+        |FROM (SELECT DISTINCT txProduct product FROM RawData)
+        |""".stripMargin)
+    indexedRawProductDF.createOrReplaceTempView("Product")
+
+    val transactionDF = spark.sql(
+      """SELECT customerId, txId transactionId, storeId, txDate dateTime, 
productId
+        |FROM RawData a
+        |JOIN Product b
+        |  ON a.txProduct = b.product
+        |""".stripMargin)
+
+    val productDF = spark.sql(
+      """SELECT productId, attributes['category'] category, attributes
+        |FROM (
+        |  SELECT productId, map_filter(str_to_map(product, ';', '='), (k, v) 
-> 0 < length(k)) attributes
+        |  FROM Product
+        |)
+        |""".stripMargin)
+
+    (locationDF, storeDF, customerDF, productDF, transactionDF)
   }
 
   /**
-   * Runs the ETL and returns the total number of 
locations,stores,customers,products,transactions.
+   * Runs the ETL and returns the total number of locations, stores, 
customers, products, transactions.
    */
-  def run(sc:SparkContext, parameters:ETLParameters) : 
(Long,Long,Long,Long,Long) = {
-    val rawStringRDD = readRawData(sc, parameters.inputDir)
-    val rawRecordRDD = parseRawData(rawStringRDD)
-    val normalizedRDDs = normalizeData(rawRecordRDD)
-
-    val locationRDD = normalizedRDDs._1
-    val storeRDD = normalizedRDDs._2
-    val customerRDD = normalizedRDDs._3
-    val productRDD = normalizedRDDs._4
-    val transactionRDD = normalizedRDDs._5
-
-    IOUtils.save(parameters.outputDir, locationRDD, storeRDD,
-      customerRDD, productRDD, transactionRDD)
-
-    (locationRDD.count(), storeRDD.count(), customerRDD.count(),
-      productRDD.count(), transactionRDD.count())
+  def run(spark: SparkSession, parameters: ETLParameters): (Long, Long, Long, 
Long, Long) = {
+    val rawDF = readRawData(spark, parameters.inputDir)
+    val (locationDF, storeDF, customerDF, productDF, transactionDF) = 
normalizeData(spark, rawDF)
+
+    IOUtils.save(parameters.outputDir, locationDF, storeDF, customerDF, 
productDF, transactionDF)
+
+    (locationDF.count(), storeDF.count(), customerDF.count(), 
productDF.count(), transactionDF.count())
   }
 
   def main(args: Array[String]): Unit = {
     val parameters = parseArgs(args)
 
-    println("Creating SparkConf")
-
-    val conf = new SparkConf().setAppName("BPS Data Generator")
-
-    println("Creating SparkContext")
+    println("Creating SparkSession")
 
-    val sc = new SparkContext(conf)
+    val spark = SparkSession.builder.appName("BPS Data Generator")
+      .config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
 
-    run(sc, parameters)
+    run(spark, parameters)
 
-    sc.stop()
+    spark.stop()
   }
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
index 6169a0039..28c817297 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala
@@ -17,15 +17,14 @@
 
 package org.apache.bigtop.bigpetstore.spark.generator
 
-import com.github.rnowling.bps.datagenerator.datamodels._
-import com.github.rnowling.bps.datagenerator.{DataLoader, 
PurchasingProfileGenerator, StoreGenerator, TransactionGenerator, 
CustomerGenerator => CustGen}
-import com.github.rnowling.bps.datagenerator.framework.SeedFactory
+import org.apache.bigtop.datagenerators.bigpetstore.datamodels._
+import org.apache.bigtop.datagenerators.bigpetstore._
+import org.apache.bigtop.datagenerators.samplers.SeedFactory
 
 import scala.jdk.CollectionConverters._
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd._
 
-import java.util.ArrayList
 import java.util.Date
 import scala.util.Random
 
@@ -37,7 +36,6 @@ import scala.util.Random
  * is stringified into what is often called a "line item".
  *
  * Then, spark writes those line items out as a distributed hadoop file glob.
- *
  */
 object SparkDriver {
   private var nStores: Int = -1
@@ -51,17 +49,17 @@ object SparkDriver {
   private def printUsage(): Unit = {
     val usage: String =
       "BigPetStore Data Generator.\n" +
-      "Usage: spark-submit ... outputDir nStores nCustomers simulationLength 
[seed]\n" +
-      "outputDir - (string) directory to write files\n" +
-      "nStores - (int) number of stores to generate\n" +
-      "nCustomers - (int) number of customers to generate\n" +
-      "simulationLength - (float) number of days to simulate\n" +
-      "seed - (long) seed for RNG. If not given, one is reandomly generated.\n"
+        "Usage: spark-submit ... outputDir nStores nCustomers simulationLength 
[seed]\n" +
+        "outputDir - (string) directory to write files\n" +
+        "nStores - (int) number of stores to generate\n" +
+        "nCustomers - (int) number of customers to generate\n" +
+        "simulationLength - (float) number of days to simulate\n" +
+        "seed - (long) seed for RNG. If not given, one is randomly 
generated.\n"
     System.err.println(usage)
   }
 
   def parseArgs(args: Array[String]): Unit = {
-    if(args.length != NPARAMS && args.length != (NPARAMS - 1)) {
+    if (args.length != NPARAMS && args.length != (NPARAMS - 1)) {
       printUsage()
       System.exit(1)
     }
@@ -70,7 +68,7 @@ object SparkDriver {
       nStores = args(1).toInt
     }
     catch {
-      case _ : NumberFormatException =>
+      case _: NumberFormatException =>
         System.err.println("Unable to parse '" + args(1) + "' as an integer 
for nStores.\n")
         printUsage()
         System.exit(1)
@@ -79,7 +77,7 @@ object SparkDriver {
       nCustomers = args(2).toInt
     }
     catch {
-      case _ : NumberFormatException =>
+      case _: NumberFormatException =>
         System.err.println("Unable to parse '" + args(2) + "' as an integer 
for nCustomers.\n")
         printUsage()
         System.exit(1)
@@ -88,26 +86,26 @@ object SparkDriver {
       simulationLength = args(3).toDouble
     }
     catch {
-      case _ : NumberFormatException =>
+      case _: NumberFormatException =>
         System.err.println("Unable to parse '" + args(3) + "' as a float for 
simulationLength.\n")
         printUsage()
         System.exit(1)
     }
 
-    //If seed isnt present, then no is used seed.
-    if(args.length == NPARAMS) {
+    //If seed isn't present, then no is used seed.
+    if (args.length == NPARAMS) {
       try {
         seed = args(4).toLong
       }
       catch {
-        case _ : NumberFormatException =>
+        case _: NumberFormatException =>
           System.err.println("Unable to parse '" + args(4) + "' as a long for 
seed.\n")
           printUsage()
           System.exit(1)
       }
     }
     else {
-      seed = (new Random()).nextLong()
+      seed = new Random().nextLong()
     }
   }
 
@@ -121,62 +119,49 @@ object SparkDriver {
     val seedFactory = new SeedFactory(seed)
 
     println("Generating stores...")
-    val stores : ArrayList[Store] = new ArrayList()
     val storeGenerator = new StoreGenerator(inputData, seedFactory)
-    for(i <- 1 to nStores) {
-      val store = storeGenerator.generate()
-      stores.add(store)
-    }
-    println("Done.")
+    val stores = (1 to nStores).foldLeft(Nil: List[Store])((stores, _) => 
storeGenerator.generate() :: stores)
+    println("...Done generating stores.")
 
     println("Generating customers...")
-    var customers: List[Customer] = List()
-    val custGen = new CustGen(inputData, stores, seedFactory)
-    for(i <- 1 to nCustomers) {
-      val customer = custGen.generate()
-      customers = customer :: customers
-    }
+    val customerGenerator = new CustomerGenerator(inputData, stores.asJava, 
seedFactory)
+    val customers = (1 to nCustomers).foldLeft(Nil: 
List[Customer])((customers, _) => customerGenerator.generate() :: customers)
     println("...Done generating customers.")
 
-    println("Broadcasting stores and products...")
-    val storesBC = sc.broadcast(stores)
-    val productBC = sc.broadcast(inputData.getProductCategories())
+    println("Broadcasting products...")
+    val productBC = sc.broadcast(inputData.getProductCategories)
     val customerRDD = sc.parallelize(customers)
+    // This substitution into a constant value is a must, since it is referred 
to in an RDD transformation function.
+    // See BIGTOP-2148 for details.
     val simLen = simulationLength
     val nextSeed = seedFactory.getNextSeed()
-    println("...Done broadcasting stores and products.")
+    println("...Done broadcasting products.")
 
     println("Defining transaction DAG...")
 
     /**
-     *  See inline comments below regarding how we
-     *  generate TRANSACTION objects from CUSTOMERs.
+     * See inline comments below regarding how we
+     * generate TRANSACTION objects from CUSTOMERs.
      */
-    val transactionRDD = customerRDD.mapPartitionsWithIndex{
-      (index, custIter) =>
+    val transactionRDD = customerRDD.mapPartitionsWithIndex {
+      (index, customers) => {
         // Create a new RNG
         val seedFactory = new SeedFactory(nextSeed ^ index)
-        val transactionIter = custIter.map{
-        customer =>
-         val products = productBC.value
-          //Create a new purchasing profile.
-          val profileGen = new PurchasingProfileGenerator(products, 
seedFactory)
-          val profile = profileGen.generate()
-          val transGen = new TransactionGenerator(customer, profile, 
storesBC.value, products, seedFactory)
-          var transactions : List[Transaction] = List()
-         var transaction = transGen.generate()
-
-          //Create a list of this customer's transactions for the time period
-          while(transaction.getDateTime() < simLen) {
-            if (transaction.getDateTime > BURNIN_TIME) {
-              transactions = transaction :: transactions
-            }
-            transaction = transGen.generate()
+        val products = productBC.value
+        val profileGen = new PurchasingModelGenerator(products, seedFactory)
+        val profile = profileGen.generate()
+        val transactions = customers.map {
+          customer => {
+            // Create a new purchasing profile.
+            val transGen = new TransactionGenerator(customer, profile, 
products, seedFactory)
+            // Create a list of this customer's transactions for the time 
period
+            Iterator.continually(transGen.generate()).takeWhile(_.getDateTime 
< simLen).foldLeft(Nil: List[Transaction])(
+              (transactions, transaction) => if (transaction.getDateTime > 
BURNIN_TIME) transaction :: transactions else transactions
+            ).reverse
           }
-          //The final result, we return the list of transactions produced 
above.
-           transactions
         }
-      transactionIter
+        transactions
+      }
     }.flatMap(s => s)
 
     println("...Done defining transaction DAG.")
@@ -188,51 +173,39 @@ object SparkDriver {
     println(s"... Done Generating $nTrans transactions.")
 
     /**
-     *  Return the RDD representing all the petstore transactions.
-     *  This RDD contains a distributed collection of instances where
-     *  a customer went to a pet store, and bought a variable number of items.
-     *  We can then serialize all the contents to disk.
+     * Return the RDD representing all the petstore transactions.
+     * This RDD contains a distributed collection of instances where
+     * a customer went to a pet store, and bought a variable number of items.
+     * We can then serialize all the contents to disk.
      */
     transactionRDD
   }
 
-  def lineItem(t: Transaction, date:Date, p:Product): String = {
-      t.getStore.getId.toString + "," +
+  private def lineItem(t: Transaction, date: Date, p: Product): String = {
+    t.getStore.getId.toString + "," +
       t.getStore.getLocation.getZipcode + "," +
       t.getStore.getLocation.getCity + "," +
       t.getStore.getLocation.getState + "," +
       t.getCustomer.getId + "," +
-      t.getCustomer.getName.getFirst + "," +t.getCustomer.getName.getSecond + 
"," +
+      t.getCustomer.getName.getKey + "," +
+      t.getCustomer.getName.getValue + "," +
       t.getCustomer.getLocation.getZipcode + "," +
       t.getCustomer.getLocation.getCity + "," +
       t.getCustomer.getLocation.getState + "," +
       t.getId + "," +
       date + "," + p
   }
-  def writeData(transactionRDD : RDD[Transaction]): Unit = {
-    val initialDate : Long = new Date().getTime()
-
-    val transactionStringsRDD = transactionRDD.flatMap {
-      transaction =>
-        val products = transaction.getProducts()
-
-        /*********************************************************
-        * we define a "records" RDD : Which is a
-        * mapping of products from each single transaction to strings.
-        *
-        * So we ultimately define an RDD of strings, where each string 
represents
-        * an instance where of a item purchase.
-        * ********************************************************/
-        val records = products.asScala.map {
-          product =>
-            val storeLocation = transaction.getStore().getLocation()
-            // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec 
/ min * 1000 ms / sec
-            val dateMS = (transaction.getDateTime * 24.0 * 60.0 * 60.0 * 
1000.0).toLong
-            // Return a stringified "line item", which represents a single 
item bought.
-            lineItem(transaction, new Date(initialDate + dateMS), product)
-        }
 
-      records
+  def writeData(transactionRDD: RDD[Transaction]): Unit = {
+    val initialDate: Long = new Date().getTime
+
+    val transactionStringsRDD = transactionRDD.flatMap { transaction =>
+      transaction.getProducts.asScala.map { product =>
+        // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / 
min * 1000 ms / sec
+        val dateMS = (transaction.getDateTime * 24.0 * 60.0 * 60.0 * 
1000.0).toLong
+        // Return a stringified "line item", which represents a single item 
bought.
+        lineItem(transaction, new Date(initialDate + dateMS), product)
+      }
     }
     // Distributed serialization of the records to part-r-* files...
     transactionStringsRDD.saveAsTextFile(outputDir + "/transactions")
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
index 92d5fe1ee..16faa7e2c 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -19,12 +19,11 @@ package org.apache.bigpetstore.spark
 
 import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics
 import org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts
-import org.apache.bigtop.bigpetstore.spark.datamodel.{Statistics, IOUtils}
+import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, Statistics}
 import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters
 import org.apache.bigtop.bigpetstore.spark.etl.SparkETL
 import org.apache.bigtop.bigpetstore.spark.generator.SparkDriver
-import org.apache.spark.SparkConf
-import org.apache.spark.SparkContext
+import org.apache.spark.sql.SparkSession
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
@@ -37,44 +36,44 @@ import java.nio.file.Files
 @RunWith(classOf[JUnitRunner])
 class TestFullPipeline extends AnyFunSuite with BeforeAndAfterAll {
 
-  val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
-  val sc = new SparkContext(conf)
+  private val spark = SparkSession.builder.appName("BPS Data Generator Test 
Suite").master("local[2]")
+    .config("spark.sql.legacy.timeParserPolicy", "LEGACY").getOrCreate()
 
   override def afterAll(): Unit = {
-    sc.stop()
+    spark.stop()
   }
 
   test("Full integration test.") {
 
     // First generate the data.
-    val tmpDir:File = 
Files.createTempDirectory("sparkDriverSuiteGeneratedData2").toFile()
+    val tmpDir: File = 
Files.createTempDirectory("sparkDriverSuiteGeneratedData2").toFile()
 
-    //stores, customers, days, randomSeed
-    val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0","123456789")
+    // stores, customers, days, randomSeed
+    val parameters: Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0", "123456789")
     SparkDriver.parseArgs(parameters)
 
-    val transactionRDD = SparkDriver.generateData(sc)
+    val transactionRDD = SparkDriver.generateData(spark.sparkContext)
     SparkDriver.writeData(transactionRDD)
 
-    //Now ETL the data
-    val etlDir:File = Files.createTempDirectory("BPSTest_ETL2").toFile()
-    System.out.println(etlDir.getAbsolutePath + "== "+etlDir.list())
+    // Now ETL the data
+    val etlDir: File = Files.createTempDirectory("BPSTest_ETL2").toFile()
+    System.out.println(etlDir.getAbsolutePath + "== " + etlDir.list())
 
-    val (locations,stores,customers,products,transactions) = SparkETL.run(sc, 
new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath))
+    val (locations, stores, customers, products, transactions) = 
SparkETL.run(spark, ETLParameters(tmpDir.getAbsolutePath, 
etlDir.getAbsolutePath))
 
-    // assert(locations==400L) TODO : This seems to vary (325,400,)
-    assert(stores==10L)
-    assert(customers==1000L)
-    assert(products==55L)
+    //assert(locations==400L) TODO : This seems to vary (325,400,)
+    assert(stores == 10L)
+    assert(customers == 1000L)
+    //assert(products==55L)
     //assert(transactions==45349L)
 
-    //Now do the analytics.
-    val analyticsJson = new File(tmpDir,"analytics.json")
+    // Now do the analytics.
+    val analyticsJson = new File(tmpDir, "analytics.json")
 
     PetStoreStatistics.run(etlDir.getAbsolutePath,
-      analyticsJson.getAbsolutePath, sc)
+      analyticsJson.getAbsolutePath, spark)
 
-    val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson)
+    val stats: Statistics = IOUtils.readLocalAsStatistics(analyticsJson)
 
     /**
      * Assert some very generic features.  We will refine this later once
@@ -85,11 +84,11 @@ class TestFullPipeline extends AnyFunSuite with 
BeforeAndAfterAll {
     assert(stats.productDetails.length === products)
     assert(stats.transactionsByMonth.length === 12)
 
-    val recommJson = new File(tmpDir,"recommendations.json")
+    val recommJson = new File(tmpDir, "recommendations.json")
     RecommendProducts.run(etlDir.getAbsolutePath,
       recommJson.getAbsolutePath,
-      sc, nIterations=5)
+      spark, nIterations = 5)
 
-    sc.stop()
+    spark.stop()
   }
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
index 126ef16c8..b5b3e000d 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
@@ -20,14 +20,14 @@ package org.apache.bigtop.bigpetstore.spark.datamodel
 import java.nio.file.Files
 import java.util.Calendar
 import java.util.Locale
-
-import org.apache.spark.{SparkContext, SparkConf}
-
+import org.apache.spark.{SparkConf, SparkContext}
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
 import org.scalatestplus.junit.JUnitRunner
 import org.junit.runner.RunWith
 
+import java.sql.Timestamp
+
 // hack for running tests with Gradle
 @RunWith(classOf[JUnitRunner])
 class IOUtilsSuite extends AnyFunSuite with BeforeAndAfterAll {
@@ -56,9 +56,9 @@ class IOUtilsSuite extends AnyFunSuite with BeforeAndAfterAll 
{
 
     val stores = Array(Store(1L, "11111"), Store(2L, "22222"), Store(3L, 
"11111"))
 
-    val txDate1 = Calendar.getInstance(Locale.US)
-    val txDate2 = Calendar.getInstance(Locale.US)
-    val txDate3 = Calendar.getInstance(Locale.US)
+    val txDate1 = new 
Timestamp(Calendar.getInstance(Locale.US).getTimeInMillis)
+    val txDate2 = new 
Timestamp(Calendar.getInstance(Locale.US).getTimeInMillis)
+    val txDate3 = new 
Timestamp(Calendar.getInstance(Locale.US).getTimeInMillis)
 
     val transactions = Array(
       Transaction(1L, 1L, 1L, txDate1, 1L),
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
index 70a3b354f..caa4e94a5 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
@@ -20,14 +20,14 @@ package org.apache.bigtop.bigpetstore.spark.etl
 import java.util.Calendar
 import java.util.Locale
 import java.util.TimeZone
-
-import org.apache.spark.{SparkContext, SparkConf}
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
 import org.scalatestplus.junit.JUnitRunner
 import org.junit.runner.RunWith
-
 import org.apache.bigtop.bigpetstore.spark.datamodel._
+import org.apache.spark.sql.SparkSession
+
+import java.sql.Timestamp
 
 /**
  * This class tests that, when we read records from the generator, the
@@ -43,34 +43,31 @@ import org.apache.bigtop.bigpetstore.spark.datamodel._
 @RunWith(classOf[JUnitRunner])
 class ETLSuite extends AnyFunSuite with BeforeAndAfterAll {
 
+  case class TransactionProduct(customerId: Long, transactionId: Long,
+                                storeId: Long, dateTime: Timestamp, product: 
String)
+
   /**
    * TODO : We are using Option monads as a replacement for nulls.
    * Lets move towards immutable spark context instead, if possible ?
    */
-  val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
-  val sc = new SparkContext(conf)
+  private val spark = SparkSession.builder.appName("BPS Data Generator Test 
Suite").master("local[*]").getOrCreate()
 
-  var rawRecords: Option[Array[(Store, Location, Customer, Location, 
TransactionProduct)]] = None
+  var rawRecords: Option[Array[RawData]] = None
   var transactions: Option[Array[Transaction]] = None
 
-  val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, 
"66067"))
-  val locations =
+  private val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, 
"66067"))
+  private val locations =
     Array(
       Location("11553", "Uniondale", "NY"),
       Location("98110", "Bainbridge Islan", "WA"),
       Location("66067", "Ottawa", "KS"),
       Location("20152", "Chantilly", "VA"))
-  val customers = Array(Customer(999L, "Cesareo", "Lamplough", "20152"))
-  val products =
+  private val customers = Array(Customer(999L, "Cesareo", "Lamplough", 
"20152"))
+  private val products =
     Array(
-      Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> 
"Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> 
"2.67")),
-      Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog 
Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
-      Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> 
"Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", 
"per_unit_cost" -> "2.14")))
-
-  val rawLines = Array(
-    "5,11553,Uniondale,NY,999,Cesareo,Lamplough,20152,Chantilly,VA,32,Tue Nov 
03 01:08:11 EST 2015,category=dry dog food;brand=Happy Pup;flavor=Fish & 
Potato;size=30.0;per_unit_cost=2.67;",
-    "1,98110,Bainbridge 
Islan,WA,999,Cesareo,Lamplough,20152,Chantilly,VA,31,Mon Nov 02 17:51:37 EST 
2015,category=poop bags;brand=Dog 
Days;color=Blue;size=60.0;per_unit_cost=0.21;",
-    "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 
04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & 
Rice;size=14.0;per_unit_cost=2.14;")
+      Product(2L, "dry dog food", Map("category" -> "dry dog food", "brand" -> 
"Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> 
"2.67")),
+      Product(3L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog 
Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
+      Product(1L, "dry cat food", Map("category" -> "dry cat food", "brand" -> 
"Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", 
"per_unit_cost" -> "2.14")))
 
   override def beforeAll(): Unit = {
 
@@ -79,83 +76,46 @@ class ETLSuite extends AnyFunSuite with BeforeAndAfterAll {
     val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), 
Locale.US)
 
     cal1.set(2015, 10, 3, 1, 8, 11)
-
     cal2.set(2015, 10, 2, 17, 51, 37)
-
     cal3.set(2015, 9, 12, 4, 29, 46)
 
+    val ts1 = new Timestamp(cal1.getTimeInMillis)
+    val ts2 = new Timestamp(cal2.getTimeInMillis)
+    val ts3 = new Timestamp(cal3.getTimeInMillis)
+
     rawRecords = Some(Array(
-      (stores(0), locations(0), customers(0), locations(3),
-        TransactionProduct(999L, 32L, 5L, cal1, "category=dry dog 
food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;")),
+      RawData(stores(0).storeId, stores(0).zipcode, locations(0).city, 
locations(0).state, customers(0).customerId,
+        customers(0).firstName, customers(0).lastName, customers(0).zipcode, 
locations(3).city, locations(3).state,
+        32L, ts1, "category=dry dog food;brand=Happy Pup;flavor=Fish & 
Potato;size=30.0;per_unit_cost=2.67;"),
 
-      (stores(1), locations(1), customers(0), locations(3),
-        TransactionProduct(999L, 31L, 1L, cal2, "category=poop bags;brand=Dog 
Days;color=Blue;size=60.0;per_unit_cost=0.21;")),
+      RawData(stores(1).storeId, stores(1).zipcode, locations(1).city, 
locations(1).state, customers(0).customerId,
+        customers(0).firstName, customers(0).lastName, customers(0).zipcode, 
locations(3).city, locations(3).state,
+        31L, ts2, "category=poop bags;brand=Dog 
Days;color=Blue;size=60.0;per_unit_cost=0.21;"),
 
-      (stores(2), locations(2), customers(0), locations(3),
-        TransactionProduct(999L, 30L, 6L, cal3, "category=dry cat 
food;brand=Feisty Feline;flavor=Chicken & 
Rice;size=14.0;per_unit_cost=2.14;"))))
+      RawData(stores(2).storeId, stores(2).zipcode, locations(2).city, 
locations(2).state, customers(0).customerId,
+        customers(0).firstName, customers(0).lastName, customers(0).zipcode, 
locations(3).city, locations(3).state,
+        30L, ts3, "category=dry cat food;brand=Feisty Feline;flavor=Chicken & 
Rice;size=14.0;per_unit_cost=2.14;"),
+    ))
 
     transactions = Some(Array(
-      Transaction(999L, 31L, 1L, cal2, 0L),
-      Transaction(999L, 30L, 6L, cal3, 2L),
-      Transaction(999L, 32L, 5L, cal1, 1L)))
+      Transaction(999L, 31L, 1L, ts2, 3L),
+      Transaction(999L, 30L, 6L, ts3, 1L),
+      Transaction(999L, 32L, 5L, ts1, 2L)))
   }
 
-
   override def afterAll(): Unit = {
-    sc.stop()
-  }
-
-  test("Parsing Generated Strings into Transaction Objects") {
-    val rawRDD = sc.parallelize(rawLines)
-    val expectedRecords = rawRecords.get
-
-    //Goal: Confirm that these RDD's are identical to the expected ones.
-    val rdd = SparkETL.parseRawData(rawRDD).collect()
-
-    /**
-     * Assumption: Order of RDD elements will be same as the mock records.
-     * This assumption seems to hold, but probably would break down if input 
size was large
-     * or running this test on distributed cluster.
-     */
-    for(i <- 0 to expectedRecords.length-1) {
-      val rawRecord = rdd(i)
-      val expectedRecord = expectedRecords(i)
-
-      //Store, Location, Customer, TransactionProduct
-      assert(rawRecord._1===expectedRecord._1)
-      assert(rawRecord._2===expectedRecord._2)
-      assert(rawRecord._3===expectedRecord._3)
-      assert(rawRecord._4===expectedRecord._4)
-
-      //Transaction
-      assert(rawRecord._5.customerId === expectedRecord._5.customerId)
-      assert(rawRecord._5.product === expectedRecord._5.product)
-      assert(rawRecord._5.storeId === expectedRecord._5.storeId)
-
-      //BIGTOP-1586 : We want granular assertions, and we don't care to 
compare millisecond timestamps.
-      assert(rawRecord._5.dateTime.get(Calendar.YEAR) === 
expectedRecord._5.dateTime.get(Calendar.YEAR))
-      assert(rawRecord._5.dateTime.get(Calendar.MONTH) === 
expectedRecord._5.dateTime.get(Calendar.MONTH))
-      assert(rawRecord._5.dateTime.get(Calendar.DAY_OF_MONTH) === 
expectedRecord._5.dateTime.get(Calendar.DAY_OF_MONTH))
-      assert(rawRecord._5.dateTime.get(Calendar.HOUR_OF_DAY) === 
expectedRecord._5.dateTime.get(Calendar.HOUR_OF_DAY))
-      assert(rawRecord._5.dateTime.get(Calendar.MINUTE) === 
expectedRecord._5.dateTime.get(Calendar.MINUTE))
-      assert(rawRecord._5.dateTime.get(Calendar.SECOND) === 
expectedRecord._5.dateTime.get(Calendar.SECOND))
-    }
-
+    spark.stop()
   }
 
   test("Generation of unique sets of transaction attributes") {
-    val rawRDD = sc.parallelize(rawRecords.get)
-    val rdds = SparkETL.normalizeData(rawRDD)
-    val locationRDD = rdds._1
-    val storeRDD = rdds._2
-    val customerRDD = rdds._3
-    val productRDD = rdds._4
-    val transactionRDD = rdds._5
-
-    assert(storeRDD.collect().toSet === stores.toSet)
-    assert(locationRDD.collect().toSet === locations.toSet)
-    assert(customerRDD.collect().toSet === customers.toSet)
-    assert(productRDD.collect().toSet === products.toSet)
-    assert(transactionRDD.collect().toSet === transactions.get.toSet)
+    val rawDF = spark.createDataFrame(rawRecords.get)
+    val (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD) = 
SparkETL.normalizeData(spark, rawDF)
+
+    import spark.implicits._
+    assert(storeRDD.as[Store].collect().toSet === stores.toSet)
+    assert(locationRDD.as[Location].collect().toSet === locations.toSet)
+    assert(customerRDD.as[Customer].collect().toSet === customers.toSet)
+    assert(productRDD.as[Product].collect().toSet === products.toSet)
+    assert(transactionRDD.as[Transaction].collect().toSet === 
transactions.get.toSet)
   }
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
index 6ab00c17f..8f6ee3267 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
@@ -30,22 +30,22 @@ import org.junit.runner.RunWith
 
 // hack for running tests with Gradle
 @RunWith(classOf[JUnitRunner])
-class SparkDriverSuite extends AnyFunSuite  with BeforeAndAfterAll {
+class SparkDriverSuite extends AnyFunSuite with BeforeAndAfterAll {
 
   val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
   val sc = new SparkContext(conf);
 
   override def afterAll(): Unit = {
-      sc.stop();
+    sc.stop();
   }
 
   /**
    * Run the test, return outputdir of the raw data.
    */
-  def runGenerator(sc:SparkContext) : File = {
-    val tmpDir:File = 
Files.createTempDirectory("sparkDriverSuiteGeneratedData").toFile()
+  def runGenerator(sc: SparkContext): File = {
+    val tmpDir: File = 
Files.createTempDirectory("sparkDriverSuiteGeneratedData").toFile()
     // 10 stores, 1000 customers, 365.0 days
-    val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0")
+    val parameters: Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0")
 
     SparkDriver.parseArgs(parameters)
 
@@ -54,14 +54,12 @@ class SparkDriverSuite extends AnyFunSuite  with 
BeforeAndAfterAll {
     assert(transactionCount > 0)
 
     SparkDriver.writeData(transactionRDD)
-    tmpDir;
-
+    tmpDir
   }
 
   test("Generating data") {
-
-    val tmpDir:File =runGenerator(sc);
-    val transactionDir:File = new File(tmpDir, "transactions")
+    val tmpDir: File = runGenerator(sc);
+    val transactionDir: File = new File(tmpDir, "transactions")
     assert(transactionDir.exists())
     assert(transactionDir.isDirectory())
     //TODO : Assert format is TextFile
diff --git a/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle
index 7a0d9afb7..79ed2984c 100644
--- a/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle
+++ b/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle
@@ -50,6 +50,6 @@ dependencies {
     compile 'commons-io:commons-io:2.18.0'
     compile 'org.apache.httpcomponents:httpclient:4.5.14'
     compile 'org.apache.commons:commons-lang3:3.17.0'
-    compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2.1"
+    compile "org.apache.bigtop:bigpetstore-data-generator:3.5.0-SNAPSHOT"
     testCompile group: 'junit', name: 'junit', version: '4.13.2'
 }
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
index f03827b66..917a6a2c6 100644
--- 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
@@ -1,6 +1,6 @@
 package org.apache.bigtop.bigpetstore.qstream;
 
-import com.github.rnowling.bps.datagenerator.datamodels.Transaction;
+import org.apache.bigtop.datagenerators.bigpetstore.datamodels.Transaction;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -39,7 +39,7 @@ public class FileLoadGen extends LoadGen{
         }
 
         /**
-         * Write queue.   Every 5 seconds, write
+         * Write queue.
          */
         final LinkedBlockingQueue<Transaction> transactionQueue = new 
LinkedBlockingQueue<Transaction>(getQueueSize());
         new Thread(){
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
index 84e574fd9..3b8db5526 100644
--- 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
@@ -16,7 +16,7 @@
  */
 package org.apache.bigtop.bigpetstore.qstream;
 
-import com.github.rnowling.bps.datagenerator.datamodels.Transaction;
+import org.apache.bigtop.datagenerators.bigpetstore.datamodels.Transaction;
 import org.apache.http.HttpResponse;
 
 import java.net.URL;
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
index f627b3637..a60d0fe2f 100644
--- 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
@@ -17,11 +17,12 @@
 
 package org.apache.bigtop.bigpetstore.qstream;
 
-import com.github.rnowling.bps.datagenerator.datamodels.inputs.InputData;
-import com.github.rnowling.bps.datagenerator.datamodels.inputs.ProductCategory;
-import com.github.rnowling.bps.datagenerator.datamodels.*;
-import com.github.rnowling.bps.datagenerator.*;
-import com.github.rnowling.bps.datagenerator.framework.SeedFactory;
+import 
org.apache.bigtop.datagenerators.bigpetstore.datamodels.inputs.InputData;
+import 
org.apache.bigtop.datagenerators.bigpetstore.datamodels.inputs.ProductCategory;
+import org.apache.bigtop.datagenerators.bigpetstore.datamodels.*;
+import 
org.apache.bigtop.datagenerators.bigpetstore.generators.purchase.PurchasingModel;
+import org.apache.bigtop.datagenerators.bigpetstore.*;
+import org.apache.bigtop.datagenerators.samplers.SeedFactory;
 import com.google.common.collect.Lists;
 
 import java.util.*;
@@ -46,7 +47,7 @@ public abstract class LoadGen {
         return 100*1000;
     }
 
-    public abstract LinkedBlockingQueue<Transaction> startWriteQueue(int 
qSize);
+    public abstract LinkedBlockingQueue<Transaction> startWriteQueue(int 
milliseconds);
 
     public static boolean TESTING=false;
 
@@ -95,10 +96,9 @@ public abstract class LoadGen {
     public static void main(String[] args){
         try {
             LoadGen lg = LoadGenFactory.parseArgs(args);
-            long start=System.currentTimeMillis();
             int runs = 0;
-            //write everything to /tmp, every 20 seconds.
-            LinkedBlockingQueue<Transaction> q = lg.startWriteQueue(10000);
+            //write everything to /tmp, every second.
+            LinkedBlockingQueue<Transaction> q = lg.startWriteQueue(1000);
             while(true){
                 lg.iterateData(q, System.currentTimeMillis());
                 runs++;
@@ -152,8 +152,8 @@ public abstract class LoadGen {
         if(! custIter.hasNext())
             throw new RuntimeException("No customer data ");
         //Create a new purchasing profile.
-        PurchasingProfileGenerator profileGen = new 
PurchasingProfileGenerator(products, seedFactory);
-        PurchasingProfile profile = profileGen.generate();
+        PurchasingModelGenerator profileGen = new 
PurchasingModelGenerator(products, seedFactory);
+        PurchasingModel profile = profileGen.generate();
 
         /** Stop either if
         * 1) the queue is full
@@ -162,7 +162,7 @@ public abstract class LoadGen {
         while(queue.remainingCapacity()>0 && custIter.hasNext()){
             Customer cust = custIter.next();
             int transactionsForThisCustomer = 0;
-            TransactionGenerator transGen = new TransactionGenerator(cust, 
profile, stores, products, seedFactory);
+            TransactionGenerator transGen = new TransactionGenerator(cust, 
profile, products, seedFactory);
             Transaction trC = transGen.generate();
             while(trC.getDateTime()<simulationLength) {
                 queue.put(trC);
@@ -171,4 +171,4 @@ public abstract class LoadGen {
         }
 
     }
-}
\ No newline at end of file
+}
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
index 78da7afdc..c118fe616 100644
--- 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
@@ -1,6 +1,6 @@
 package org.apache.bigtop.bigpetstore.qstream;
 
-import com.github.rnowling.bps.datagenerator.datamodels.Transaction;
+import org.apache.bigtop.datagenerators.bigpetstore.datamodels.Transaction;
 import org.apache.http.HttpResponse;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.HttpClient;
@@ -93,4 +93,4 @@ public class Utils {
         return var2.toString();
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/DataLoader.java
 
b/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/DataLoader.java
index e8a4023e7..4618b84f5 100644
--- 
a/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/DataLoader.java
+++ 
b/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/DataLoader.java
@@ -26,7 +26,7 @@ public class DataLoader
        public InputData loadData() throws Exception
        {
                List<Location> locations = new LocationReader().readData();
-               InputData inputData = new InputData(locations);
+               InputData inputData = new InputData(locations, new 
ProductGenerator(Constants.PRODUCTS_COLLECTION).generate());
 
                return inputData;
        }
diff --git 
a/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/datamodels/inputs/InputData.java
 
b/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/datamodels/inputs/InputData.java
index 074ea7ef2..879414ec5 100644
--- 
a/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/datamodels/inputs/InputData.java
+++ 
b/bigtop-data-generators/bigpetstore-data-generator/src/main/java/org/apache/bigtop/datagenerators/bigpetstore/datamodels/inputs/InputData.java
@@ -16,9 +16,12 @@
 package org.apache.bigtop.datagenerators.bigpetstore.datamodels.inputs;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.bigtop.datagenerators.bigpetstore.Constants;
+import org.apache.bigtop.datagenerators.bigpetstore.ProductGenerator;
 import org.apache.bigtop.datagenerators.locations.Location;
 
 public class InputData implements Serializable
@@ -26,14 +29,26 @@ public class InputData implements Serializable
        private static final long serialVersionUID = 9078989799806707788L;
 
        List<Location> zipcodeTable;
+       Collection<ProductCategory> productCategories;
 
        public InputData(List<Location> zipcodeTable)
+       {
+               this(zipcodeTable, new 
ProductGenerator(Constants.PRODUCTS_COLLECTION).generate());
+       }
+
+       public InputData(List<Location> zipcodeTable, 
Collection<ProductCategory> productCategories)
        {
                this.zipcodeTable = Collections.unmodifiableList(zipcodeTable);
+               this.productCategories = 
Collections.unmodifiableCollection(productCategories);
        }
 
        public List<Location> getZipcodeTable()
        {
                return zipcodeTable;
        }
+
+       public Collection<ProductCategory> getProductCategories()
+       {
+               return productCategories;
+       }
 }
diff --git a/bigtop-data-generators/build.gradle 
b/bigtop-data-generators/build.gradle
index 17baffb1a..aaa0e3959 100644
--- a/bigtop-data-generators/build.gradle
+++ b/bigtop-data-generators/build.gradle
@@ -17,6 +17,15 @@
 subprojects {
   apply plugin: 'eclipse'
   apply plugin: 'java'
+  apply plugin: 'maven-publish'
+
+  publishing {
+    publications {
+      mavenJava(MavenPublication) {
+        artifact jar
+      }
+    }
+  }
 
   Node xml = new XmlParser().parse("../pom.xml")
   group = xml.groupId.first().value().first()

Reply via email to