Hi Spark community, I'm very new to the Apache Spark community; but if this (very active) group is anything like the other Apache project user groups I've worked with, I'm going to enjoy discussions here very much. Thanks in advance!
Use Case: I am trying to go from flat files of user response data, to contingency tables of frequency counts, to Pearson Chi-Square correlation statistics and perform a Chi-Squared hypothesis test. The user response data represents a multiple choice question-answer (MCQ) format. The goal is to compute all choose-two combinations of question answers (precondition, question X question) contingency tables. Each cell of the contingency table is the intersection of the users whom responded per each option per each question of the table. An overview of the problem: // data ingestion and typing schema: Observation (u: String, d: java.util.Date, t: String, q: String, v: String, a: Int) // a question (q) has a finite set of response options (v) per which a user (u) responds // additional response fields are not required per this test for (precondition a) { for (q_i in lex ordered questions) { for (q_j in lex ordered question, q_j > q_i) { forall v_k \in q_i get set of distinct users {u}_ik forall v_l \in q_j get set of distinct users {u}_jl forall cells per table (a,q_i,q_j) defn C_ijkl = |intersect({u}_ik, {u}_jl)| // contingency table construct compute chisq test per this contingency table and persist } } } The scala main I'm testing is provided below, and I was planning to use the provided example https://spark.apache.org/docs/1.3.1/mllib-statistics.html however I am not sure how to go from my RDD[Observation] to the necessary precondition of RDD[Vector] for ingestion def main(args: Array[String]): Unit = { // setup main space for test val conf = new SparkConf().setAppName("TestMain") val sc = new SparkContext(conf) // data ETL and typing schema case class Observation (u: String, d: java.util.Date, t: String, q: String, v: String, a: Int) val date_format = new java.text.SimpleDateFormat("yyyyMMdd") val data_project_abs_dir = "/my/path/to/data/files" val data_files = data_project_abs_dir + "/part-*.gz" val data = sc.textFile(data_files) val observations = data.map(line => line.split(",").map(_.trim)).map(r => Observation(r(0).toString, date_format.parse(r(1).toString), r(2).toString, r(3).toString, r(4).toString, r(5).toInt)) observations.cache // ToDo: the basic keying of the space, possibly... val qvu = observations.map(o => ((o.a, o.q, o.v), o.u)).distinct // ToDo: ok, so now how to get this into the precondition RDD[Vector] from the Spark example to make a contingency table?... // ToDo: perform then persist the resulting chisq and p-value on these contingency tables... } Any help is appreciated. Thanks! -Dan